Skip to content

IGNITE-25774 Extension to write CDC data to PostgreSQL documentation #12193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -337,3 +337,227 @@ Configuration is done via Ignite node plugin:
</bean>
</property>
```

== CDC replication to PostgreSql

`IgniteToPostgreSqlCdcConsumer` is a CDC consumer that asynchronously replicates data from Apache Ignite to PostgreSQL.
It uses Apache Ignite’s Change Data Capture (CDC) mechanism to track data changes (`insert`, `update`, `delete`) in specified caches and apply them to PostgreSQL.

== Key Features

- Per-cache replication (only selected caches are replicated)
- `onlyPrimary` support (replicates only from primary nodes)
- Auto table creation in PostgreSQL if needed (`createTables=true`)
- Batch replication (`batchSize`)
- User-defined `DataSource` — user configures reliability and transactional guarantees

== Configuration

Spring XML configuration example (`ignite-to-postgres.xml`):

[source,xml]
----
<bean id="cdc.cfg" class="org.apache.ignite.cdc.CdcConfiguration">
<property name="consumer">
<bean class="org.apache.ignite.cdc.postgresql.IgniteToPostgreSqlCdcConsumer">
<property name="caches">
<list>
<value>T1</value>
<value>T2</value>
</list>
</property>
<property name="batchSize" value="1024" />
<property name="onlyPrimary" value="true" />
<property name="createTables" value="true" />
<property name="dataSource" ref="dataSource" />
</bean>
</property>
</bean>
----

=== Configuration Options

The following settings can be used to configure the behavior of `IgniteToPostgreSqlCdcConsumer`:

[cols="1,2,1", options="header"]
|===
| Setting | Description | Default
| `dataSource` | JDBC `DataSource` used to connect to the target PostgreSQL database. Must be provided by the user. | _Required_
| `caches` | Set of Ignite cache names to replicate. Must be provided by the user. | _Required_
| `onlyPrimary` | If `true`, replicates only events originating from the primary node. Useful to avoid duplicate updates in replicated clusters. | `true`
| `maxBatchSize` | Maximum number of statements per batch submitted to PostgreSQL. Affects how many rows are commited in a single `executeBatch()` call. | `1024` |
| `createTables`| If `true`, missing target tables in PostgreSQL will be created automatically during startup.| `false`
|===

We use `PreparedStatement` for batching with `autoCommit` set to `false`, committing manually after each batch execution.


[WARNING]
====
Choosing the `dataSource` is the user's responsibility. Consider:
- Required delivery guarantees (e.g., retry logic)
- High-availability PostgreSQL setup (replicas, failover, etc.)
====

== Example `dataSource`

[source,xml]
----
<bean id="dataSource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="org.postgresql.Driver"/>
<property name="url" value="jdbc:postgresql://localhost:5432/ignite_replica"/>
<property name="username" value="ignite_user"/>
<property name="password" value="secret"/>
<property name="initialSize" value="3"/>
<property name="maxTotal" value="10"/>
<property name="validationQuery" value="SELECT 1"/>
<property name="testOnBorrow" value="true"/>
</bean>
----

== Schema Conversion

Table schema in PostgreSQL is generated from the `QueryEntity` configured in Ignite cache.
Only **one QueryEntity is supported per cache** and is used to generate DDL and DML operations.

Schema creation occurs once on the first `CdcCacheEvent` if `createTables=true`.

=== Example: Schema from Ignite to PostgreSQL

[source,java]
----
class TestVal {
private final String name;
private final int val;
}

QueryEntity qryEntity = new QueryEntity()
.setTableName("test_table")
.setKeyFieldName("id")
.setValueType("demo.TestVal")
.addQueryField("id", Integer.class.getName(), null)
.addQueryField("name", String.class.getName(), null)
.addQueryField("val", Integer.class.getName(), null);

ignite.getOrCreateCache(new CacheConfiguration<Integer, TestVal>("test_table")
.setQueryEntities(List.of(qryEntity)));
----

→ PostgreSQL:

[source,sql]
----
CREATE TABLE test_table (
id INT PRIMARY KEY,
name VARCHAR,
val INT
);
----

=== Composite Key Example

[source,java]
----
class TestKey {
private final int id;
private final String subId;
}

class TestVal {
private final String name;
private final int val;
}

QueryEntity qryEntity = new QueryEntity()
.setTableName("test_table")
.setKeyFields(Set.of("id", "subId"))
.setValueType("demo.TestVal")
.addQueryField("id", Integer.class.getName(), null)
.addQueryField("subId", String.class.getName(), null)
.addQueryField("name", String.class.getName(), null)
.addQueryField("val", Integer.class.getName(), null);

ignite.getOrCreateCache(new CacheConfiguration<TestKey, TestVal>("test_table")
.setQueryEntities(List.of(qryEntity)));
----

→ PostgreSQL:

[source,sql]
----
CREATE TABLE test_table (
id INT,
subId VARCHAR,
name VARCHAR,
val INT,
PRIMARY KEY (id, subId)
);
----

== Insert / Update / Delete Events

Insert, update, and delete operations are handled via `CdcEvent`.

=== Upsert with Version Conflict Resolution

Each insert/update is translated into an `INSERT ... ON CONFLICT DO UPDATE` query, with version-based conflict resolution.

[NOTE]
====
A `version` column is automatically added and stored as `BYTEA`.

This version is a 16-byte array based on `CacheEntryVersion` encoded in big-endian order:

- 4 bytes — `topologyVersion` (int)
- 8 bytes — `order` (long)
- 4 bytes — `nodeOrder` (int)

This allows PostgreSQL to compare versions lexicographically:

[source,sql]
----
INSERT INTO test_table (id, name, val, version)
VALUES (1, 'value', 5, E'\x...')
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
val = EXCLUDED.val
WHERE test_table.version < EXCLUDED.version;
----
====

=== Delete Example

[source,sql]
----
DELETE FROM test_table WHERE id = 1;
----

== Java → PostgreSQL Type Mapping

|===
| Java Type | PostgreSQL Type

| java.lang.String | VARCHAR
| java.lang.Integer / int | INT
| java.lang.Long / long | BIGINT
| java.lang.Boolean / boolean | BOOLEAN
| java.lang.Double / double | DOUBLE PRECISION
| java.lang.Float / float | REAL
| java.math.BigDecimal | DECIMAL
| java.lang.Short / short | SMALLINT
| java.lang.Byte / byte | SMALLINT
| java.util.UUID | UUID
| byte[] (`[B`) | BYTEA
| java.lang.Object | OTHER
|===

[NOTE]
====
`OTHER` is used as fallback if type is not recognized.
====

== Limitations

- Only BinaryObject and primitive fields are supported
- `keepBinary` must be set to `true`
- Schema evolution is not supported — run with `createTables=true` at startup