From f6ff9605ff408e97b840ea476dcf23f24031954a Mon Sep 17 00:00:00 2001 From: Maksim Davydov Date: Wed, 16 Jul 2025 23:52:11 +0300 Subject: [PATCH 1/4] IGNITE-25774 Extension to write CDC data to PostgreSQL --- .../change-data-capture-extensions.adoc | 207 ++++++++++++++++++ 1 file changed, 207 insertions(+) diff --git a/docs/_docs/extensions-and-integrations/change-data-capture-extensions.adoc b/docs/_docs/extensions-and-integrations/change-data-capture-extensions.adoc index de30adaf9598f..8b0460bf6273a 100644 --- a/docs/_docs/extensions-and-integrations/change-data-capture-extensions.adoc +++ b/docs/_docs/extensions-and-integrations/change-data-capture-extensions.adoc @@ -337,3 +337,210 @@ Configuration is done via Ignite node plugin: ``` + +== CDC replication to PostgreSql + +`IgniteToPostgreSqlCdcConsumer` is a CDC consumer that asynchronously replicates data from Apache Ignite to PostgreSQL. +It uses Apache Ignite’s Change Data Capture (CDC) mechanism to track data changes (`insert`, `update`, `delete`) in specified caches and apply them to PostgreSQL. + +== Key Features + +- Per-cache replication (only selected caches are replicated) +- `onlyPrimary` support (replicates only from primary nodes) +- Auto table creation in PostgreSQL if needed (`createTables=true`) +- Batch replication (`maxBatchSize`) +- User-defined `DataSource` — user configures reliability and transactional guarantees + +== Configuration + +Spring XML configuration example (`ignite-to-postgres.xml`): + +[source,xml] +---- + + + + + + T1 + T2 + + + + + + + + + +---- + +[WARNING] +==== +Choosing the `dataSource` is the user's responsibility. Consider: +- Required delivery guarantees (e.g., retry logic) +- High-availability PostgreSQL setup (replicas, failover, etc.) +==== + +== Example `dataSource` + +[source,xml] +---- + + + + + + + + + + +---- + +== Schema Conversion + +Table schema in PostgreSQL is generated from the `QueryEntity` configured in Ignite cache. +Only **one QueryEntity is supported per cache** and is used to generate DDL and DML operations. + +Schema creation occurs once on the first `CdcCacheEvent` if `createTables=true`. + +=== Example: Schema from Ignite to PostgreSQL + +[source,java] +---- +class TestVal { + private final String name; + private final int val; +} + +QueryEntity qryEntity = new QueryEntity() + .setTableName("test_table") + .setKeyFieldName("id") + .setValueType("demo.TestVal") + .addQueryField("id", Integer.class.getName(), null) + .addQueryField("name", String.class.getName(), null) + .addQueryField("val", Integer.class.getName(), null); + +ignite.getOrCreateCache(new CacheConfiguration("test_table") + .setQueryEntities(List.of(qryEntity))); +---- + +→ PostgreSQL: + +[source,sql] +---- +CREATE TABLE test_table ( + id INT PRIMARY KEY, + name VARCHAR, + val INT +); +---- + +=== Composite Key Example + +[source,java] +---- +class TestKey { + private final int id; + private final String subId; +} + +class TestVal { + private final String name; + private final int val; +} + +QueryEntity qryEntity = new QueryEntity() + .setTableName("test_table") + .setKeyFields(Set.of("id", "subId")) + .setValueType("demo.TestVal") + .addQueryField("id", Integer.class.getName(), null) + .addQueryField("subId", String.class.getName(), null) + .addQueryField("name", String.class.getName(), null) + .addQueryField("val", Integer.class.getName(), null); + +ignite.getOrCreateCache(new CacheConfiguration("test_table") + .setQueryEntities(List.of(qryEntity))); +---- + +→ PostgreSQL: + +[source,sql] +---- +CREATE TABLE test_table ( + id INT, + subId VARCHAR, + name VARCHAR, + val INT, + PRIMARY KEY (id, subId) +); +---- + +== Insert / Update / Delete Events + +Insert, update, and delete operations are handled via `CdcEvent`. + +=== Upsert with Version Conflict Resolution + +Each insert/update is translated into an `INSERT ... ON CONFLICT DO UPDATE` query, with version-based conflict resolution. + +[NOTE] +==== +A `version` column is automatically added and stored as `BYTEA`. + +This version is a 16-byte array based on `CacheEntryVersion` encoded in big-endian order: + +- 4 bytes — `topologyVersion` (int) +- 8 bytes — `order` (long) +- 4 bytes — `nodeOrder` (int) + +This allows PostgreSQL to compare versions lexicographically: + +[source,sql] +---- +INSERT INTO test_table (id, name, val, version) +VALUES (1, 'value', 5, E'\x...') +ON CONFLICT (id) DO UPDATE SET + name = EXCLUDED.name, + val = EXCLUDED.val +WHERE test_table.version < EXCLUDED.version; +---- +==== + +=== Delete Example + +[source,sql] +---- +DELETE FROM test_table WHERE id = 1; +---- + +== Java → PostgreSQL Type Mapping + +|=== +| Java Type | PostgreSQL Type + +| java.lang.String | VARCHAR +| java.lang.Integer / int | INT +| java.lang.Long / long | BIGINT +| java.lang.Boolean / boolean | BOOLEAN +| java.lang.Double / double | DOUBLE PRECISION +| java.lang.Float / float | REAL +| java.math.BigDecimal | DECIMAL +| java.lang.Short / short | SMALLINT +| java.lang.Byte / byte | SMALLINT +| java.util.UUID | UUID +| byte[] (`[B`) | BYTEA +| java.lang.Object | OTHER +|=== + +[NOTE] +==== +`OTHER` is used as fallback if type is not recognized. +==== + +== Limitations + +- Only BinaryObject and primitive fields are supported +- `keepBinary` must be set to `true` +- Schema evolution is not supported — run with `createTables=true` at startup From 15c4d29977abdbfdfbf3f95f296de994811c3a75 Mon Sep 17 00:00:00 2001 From: Maksim Davydov Date: Sat, 19 Jul 2025 14:46:23 +0300 Subject: [PATCH 2/4] IGNITE-25774 Extension to write CDC data to PostgreSQL --- .../change-data-capture-extensions.adoc | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/docs/_docs/extensions-and-integrations/change-data-capture-extensions.adoc b/docs/_docs/extensions-and-integrations/change-data-capture-extensions.adoc index 8b0460bf6273a..d78a39fafb98d 100644 --- a/docs/_docs/extensions-and-integrations/change-data-capture-extensions.adoc +++ b/docs/_docs/extensions-and-integrations/change-data-capture-extensions.adoc @@ -375,6 +375,24 @@ Spring XML configuration example (`ignite-to-postgres.xml`): ---- +=== Configuration Options + +The following settings can be used to configure the behavior of `IgniteToPostgreSqlCdcConsumer`: + +[cols="1,2,1", options="header"] +|=== +| Setting | Description | Default +| `dataSource` | JDBC `DataSource` used to connect to the target PostgreSQL database. Must be provided by the user. | _Required_ +| `caches` | Set of Ignite cache names to replicate. Must be provided by the user. | _Required_ +| `onlyPrimary` | If `true`, replicates only events originating from the primary node. Useful to avoid duplicate updates in replicated clusters. | `true` +| `maxBatchSize` | Maximum number of statements per batch submitted to PostgreSQL. Affects transaction commit size. | `1024` +| `autoCommit` | If `true`, each batch is committed immediately after submission. If false, batches accumulate and are committed by the connector after finishing the last WAL segment | `false` +| `createTables`| If `true`, missing target tables in PostgreSQL will be created automatically during startup.| `false` +|=== + + If `autoCommit` is disabled (`false`), batches are accumulated and committed either manually or by the connector after processing a group of batches. + + [WARNING] ==== Choosing the `dataSource` is the user's responsibility. Consider: From a5b6ac44ba4d166e708c096e09f8961a87d372d7 Mon Sep 17 00:00:00 2001 From: Maksim Davydov Date: Wed, 23 Jul 2025 22:35:44 +0300 Subject: [PATCH 3/4] IGNITE-25774 maxCommitSize setting --- .../change-data-capture-extensions.adoc | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/docs/_docs/extensions-and-integrations/change-data-capture-extensions.adoc b/docs/_docs/extensions-and-integrations/change-data-capture-extensions.adoc index d78a39fafb98d..e1caf6772b9e5 100644 --- a/docs/_docs/extensions-and-integrations/change-data-capture-extensions.adoc +++ b/docs/_docs/extensions-and-integrations/change-data-capture-extensions.adoc @@ -348,7 +348,7 @@ It uses Apache Ignite’s Change Data Capture (CDC) mechanism to track data chan - Per-cache replication (only selected caches are replicated) - `onlyPrimary` support (replicates only from primary nodes) - Auto table creation in PostgreSQL if needed (`createTables=true`) -- Batch replication (`maxBatchSize`) +- Batch replication (`maxBatchSize` and `maxCommitSize`) - User-defined `DataSource` — user configures reliability and transactional guarantees == Configuration @@ -367,6 +367,7 @@ Spring XML configuration example (`ignite-to-postgres.xml`): + @@ -385,12 +386,13 @@ The following settings can be used to configure the behavior of `IgniteToPostgre | `dataSource` | JDBC `DataSource` used to connect to the target PostgreSQL database. Must be provided by the user. | _Required_ | `caches` | Set of Ignite cache names to replicate. Must be provided by the user. | _Required_ | `onlyPrimary` | If `true`, replicates only events originating from the primary node. Useful to avoid duplicate updates in replicated clusters. | `true` -| `maxBatchSize` | Maximum number of statements per batch submitted to PostgreSQL. Affects transaction commit size. | `1024` +| `maxBatchSize` | Maximum number of statements per batch submitted to PostgreSQL. Affects how many rows are sent in a single `executeBatch()` call. | `1024` | +| `maxCommitSize` | Maximum number of batches executed before committing the current transaction (when auto-commit is disabled). Controls how many `executeBatch()` calls are grouped into one transaction. | `10` | | `autoCommit` | If `true`, each batch is committed immediately after submission. If false, batches accumulate and are committed by the connector after finishing the last WAL segment | `false` | `createTables`| If `true`, missing target tables in PostgreSQL will be created automatically during startup.| `false` |=== - If `autoCommit` is disabled (`false`), batches are accumulated and committed either manually or by the connector after processing a group of batches. + If `autoCommit` is disabled (`false`), batches are accumulated and committed either manually or by the connector after processing a group of batches. If `autoCommit` is enabled (`true`), each batch is committed immediately, and `maxCommitSize` has no effect. [WARNING] From 8e84f652d32fa8363d77409b0748e24442cf567f Mon Sep 17 00:00:00 2001 From: Maksim Davydov Date: Mon, 28 Jul 2025 21:38:57 +0300 Subject: [PATCH 4/4] IGNITE-25774 * maxCommitSize setting removed --- .../change-data-capture-extensions.adoc | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/docs/_docs/extensions-and-integrations/change-data-capture-extensions.adoc b/docs/_docs/extensions-and-integrations/change-data-capture-extensions.adoc index e1caf6772b9e5..339781b5c91cc 100644 --- a/docs/_docs/extensions-and-integrations/change-data-capture-extensions.adoc +++ b/docs/_docs/extensions-and-integrations/change-data-capture-extensions.adoc @@ -348,7 +348,7 @@ It uses Apache Ignite’s Change Data Capture (CDC) mechanism to track data chan - Per-cache replication (only selected caches are replicated) - `onlyPrimary` support (replicates only from primary nodes) - Auto table creation in PostgreSQL if needed (`createTables=true`) -- Batch replication (`maxBatchSize` and `maxCommitSize`) +- Batch replication (`batchSize`) - User-defined `DataSource` — user configures reliability and transactional guarantees == Configuration @@ -366,8 +366,7 @@ Spring XML configuration example (`ignite-to-postgres.xml`): T2 - - + @@ -386,13 +385,11 @@ The following settings can be used to configure the behavior of `IgniteToPostgre | `dataSource` | JDBC `DataSource` used to connect to the target PostgreSQL database. Must be provided by the user. | _Required_ | `caches` | Set of Ignite cache names to replicate. Must be provided by the user. | _Required_ | `onlyPrimary` | If `true`, replicates only events originating from the primary node. Useful to avoid duplicate updates in replicated clusters. | `true` -| `maxBatchSize` | Maximum number of statements per batch submitted to PostgreSQL. Affects how many rows are sent in a single `executeBatch()` call. | `1024` | -| `maxCommitSize` | Maximum number of batches executed before committing the current transaction (when auto-commit is disabled). Controls how many `executeBatch()` calls are grouped into one transaction. | `10` | -| `autoCommit` | If `true`, each batch is committed immediately after submission. If false, batches accumulate and are committed by the connector after finishing the last WAL segment | `false` +| `maxBatchSize` | Maximum number of statements per batch submitted to PostgreSQL. Affects how many rows are commited in a single `executeBatch()` call. | `1024` | | `createTables`| If `true`, missing target tables in PostgreSQL will be created automatically during startup.| `false` |=== - If `autoCommit` is disabled (`false`), batches are accumulated and committed either manually or by the connector after processing a group of batches. If `autoCommit` is enabled (`true`), each batch is committed immediately, and `maxCommitSize` has no effect. +We use `PreparedStatement` for batching with `autoCommit` set to `false`, committing manually after each batch execution. [WARNING]