Skip to content

Commit c68b98c

Browse files
fix: use psycopg text array decoding in wal2json messages (#490)
Slack thread: https://meltano.slack.com/archives/C06A1MD6A6L/p1724619858505739 When using log based replication, the wal2json output for columns of array types returns a string encoded in sql format. Ex: '{a,b}' The records produced by the tap in this sutation fails the target schema validation, since the schema is of type array and the value is a string. I included a test case for it. Please feel free to modify the PR, I am by no means a python developer. Related: - eulerto/wal2json#221 (comment) --------- Co-authored-by: Edgar Ramírez Mondragón <16805946+edgarrmondragon@users.noreply.github.com>
1 parent ba7ad87 commit c68b98c

File tree

2 files changed

+64
-5
lines changed

2 files changed

+64
-5
lines changed

tap_postgres/client.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ def get_records(self, context: Context | None) -> Iterable[dict[str, Any]]:
429429
while True:
430430
message = logical_replication_cursor.read_message()
431431
if message:
432-
row = self.consume(message)
432+
row = self.consume(message, logical_replication_cursor)
433433
if row:
434434
yield row
435435
else:
@@ -456,7 +456,7 @@ def get_records(self, context: Context | None) -> Iterable[dict[str, Any]]:
456456
logical_replication_cursor.close()
457457
logical_replication_connection.close()
458458

459-
def consume(self, message) -> dict | None:
459+
def consume(self, message, cursor) -> dict | None:
460460
"""Ingest WAL message."""
461461
try:
462462
message_payload = json.loads(message.payload)
@@ -476,12 +476,12 @@ def consume(self, message) -> dict | None:
476476

477477
if message_payload["action"] in upsert_actions:
478478
for column in message_payload["columns"]:
479-
row.update({column["name"]: column["value"]})
479+
row.update({column["name"]: self._parse_column_value(column, cursor)})
480480
row.update({"_sdc_deleted_at": None})
481481
row.update({"_sdc_lsn": message.data_start})
482482
elif message_payload["action"] in delete_actions:
483483
for column in message_payload["identity"]:
484-
row.update({column["name"]: column["value"]})
484+
row.update({column["name"]: self._parse_column_value(column, cursor)})
485485
row.update(
486486
{
487487
"_sdc_deleted_at": datetime.datetime.utcnow().strftime(
@@ -517,6 +517,15 @@ def consume(self, message) -> dict | None:
517517

518518
return row
519519

520+
def _parse_column_value(self, column, cursor):
521+
# When using log based replication, the wal2json output for columns of
522+
# array types returns a string encoded in sql format, e.g. '{a,b}'
523+
# https://github.com/eulerto/wal2json/issues/221#issuecomment-1025143441
524+
if column["type"] == "text[]":
525+
return psycopg2.extensions.STRINGARRAY(column["value"], cursor)
526+
527+
return column["value"]
528+
520529
def logical_replication_connection(self):
521530
"""A logical replication connection to the database.
522531

tests/test_log_based.py

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import json
22

33
import sqlalchemy as sa
4-
from sqlalchemy.dialects.postgresql import BIGINT, TEXT
4+
from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, TEXT
55

66
from tap_postgres.tap import TapPostgres
77
from tests.test_core import PostgresTestRunner
@@ -57,3 +57,53 @@ def test_null_append():
5757
tap_class=TapPostgres, config=LOG_BASED_CONFIG, catalog=tap_catalog
5858
)
5959
test_runner.sync_all()
60+
61+
62+
def test_string_array_column():
63+
"""LOG_BASED syncs failed with array columns.
64+
65+
This test checks that even when a catalog contains properties with types represented
66+
as arrays (ex: "text[]") LOG_BASED replication can properly decode their value.
67+
"""
68+
table_name = "test_array_column"
69+
engine = sa.create_engine(
70+
"postgresql://postgres:postgres@localhost:5434/postgres", future=True
71+
)
72+
73+
metadata_obj = sa.MetaData()
74+
table = sa.Table(
75+
table_name,
76+
metadata_obj,
77+
sa.Column("id", BIGINT, primary_key=True),
78+
sa.Column("data", ARRAY(TEXT), nullable=True),
79+
)
80+
with engine.begin() as conn:
81+
table.drop(conn, checkfirst=True)
82+
metadata_obj.create_all(conn)
83+
insert = table.insert().values(id=123, data=["1", "2"])
84+
conn.execute(insert)
85+
insert = table.insert().values(id=321, data=['This is a "test"', "2"])
86+
conn.execute(insert)
87+
88+
tap = TapPostgres(config=LOG_BASED_CONFIG)
89+
tap_catalog = json.loads(tap.catalog_json_text)
90+
altered_table_name = f"public-{table_name}"
91+
92+
for stream in tap_catalog["streams"]:
93+
if stream.get("stream") and altered_table_name not in stream["stream"]:
94+
for metadata in stream["metadata"]:
95+
metadata["metadata"]["selected"] = False
96+
else:
97+
stream["replication_method"] = "LOG_BASED"
98+
stream["replication_key"] = "_sdc_lsn"
99+
for metadata in stream["metadata"]:
100+
metadata["metadata"]["selected"] = True
101+
if metadata["breadcrumb"] == []:
102+
metadata["metadata"]["replication-method"] = "LOG_BASED"
103+
104+
test_runner = PostgresTestRunner(
105+
tap_class=TapPostgres, config=LOG_BASED_CONFIG, catalog=tap_catalog
106+
)
107+
test_runner.sync_all()
108+
assert test_runner.record_messages[0]["record"]["data"] == ["1", "2"]
109+
assert test_runner.record_messages[1]["record"]["data"] == ['This is a "test"', "2"]

0 commit comments

Comments
 (0)