Skip to content

Commit 041378f

Browse files
committed
Directly handle csv generation
Use the type bind processors to generate values to ensure that values are represented correctly to PostgreSQL, especially for things like ARRAY and JSONB. Also, handle null values.
1 parent 37a9de5 commit 041378f

File tree

1 file changed

+20
-4
lines changed

1 file changed

+20
-4
lines changed

target_postgres/sinks.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
"""Postgres target sink class, which handles writing streams."""
22

3-
import csv
43
import uuid
54
from io import StringIO
65
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union, cast
@@ -164,10 +163,27 @@ def bulk_insert_records( # type: ignore[override]
164163
for record in records
165164
]
166165

167-
# Prepare a buffer with the values as csv.
166+
# Prepare processor functions for each column type. These are used to convert
167+
# from Python values to database values.
168+
column_processors = [
169+
column.type.bind_processor(connection.dialect) or str for column in columns
170+
]
171+
172+
# Create a buffer of CSV formatted values to send in bulk.
168173
buffer = StringIO()
169-
writer = csv.writer(buffer)
170-
writer.writerows(data_to_insert)
174+
for row in data_to_insert:
175+
processed_row = ",".join(
176+
map(
177+
lambda data, proc: (
178+
"" if data is None else str(proc(data)).replace('"', '""')
179+
),
180+
row,
181+
column_processors,
182+
)
183+
)
184+
185+
buffer.write(processed_row)
186+
buffer.write("\n")
171187
buffer.seek(0)
172188

173189
with connection.connection.cursor() as cur:

0 commit comments

Comments
 (0)