Skip to content

Commit 711d9af

Browse files
committed
Bulk insert data using copy
Use copy instead of insert to bulk insert records. In PostgreSQL, copy is the fastest way to insert bulk data.
1 parent 25ff7cb commit 711d9af

File tree

1 file changed

+24
-22
lines changed

1 file changed

+24
-22
lines changed

target_postgres/sinks.py

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
"""Postgres target sink class, which handles writing streams."""
22

3+
import csv
34
import uuid
4-
from typing import Any, Dict, Iterable, List, Optional, Sequence, Union, cast
5+
from io import StringIO
6+
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union, cast
57

68
import sqlalchemy as sa
79
from pendulum import now
@@ -142,35 +144,35 @@ def bulk_insert_records( # type: ignore[override]
142144
True if table exists, False if not, None if unsure or undetectable.
143145
"""
144146
columns = self.column_representation(schema)
145-
insert: str = cast(
146-
str,
147-
self.generate_insert_statement(
148-
table.name,
149-
columns,
150-
),
151-
)
152-
self.logger.info("Inserting with SQL: %s", insert)
147+
copy_statement: str = self.generate_copy_statement(table.name, columns)
148+
self.logger.info("Inserting with SQL: %s", copy_statement)
153149
# Only one record per PK, we want to take the last one
154-
data_to_insert: List[Dict[str, Any]] = []
150+
data_to_insert: Tuple[Tuple[Any]] = None
155151

156152
if self.append_only is False:
157-
insert_records: Dict[str, Dict] = {} # pk : record
153+
copy_values: Dict[str, Tuple] = {} # pk : values
158154
for record in records:
159-
insert_record = {}
160-
for column in columns:
161-
insert_record[column.name] = record.get(column.name)
155+
values = tuple((record.get(column.name) for column in columns))
162156
# No need to check for a KeyError here because the SDK already
163157
# guaruntees that all key properties exist in the record.
164158
primary_key_value = "".join([str(record[key]) for key in primary_keys])
165-
insert_records[primary_key_value] = insert_record
166-
data_to_insert = list(insert_records.values())
159+
copy_values[primary_key_value] = values
160+
data_to_insert = tuple(copy_values.values())
167161
else:
168-
for record in records:
169-
insert_record = {}
170-
for column in columns:
171-
insert_record[column.name] = record.get(column.name)
172-
data_to_insert.append(insert_record)
173-
connection.execute(insert, data_to_insert)
162+
data_to_insert = [
163+
tuple((record.get(column.name) for column in columns))
164+
for record in records
165+
]
166+
167+
# Prepare a buffer with the values as csv.
168+
buffer = StringIO()
169+
writer = csv.writer(buffer)
170+
writer.writerows(data_to_insert)
171+
buffer.seek(0)
172+
173+
with connection.connection.cursor() as cur:
174+
cur.copy_expert(sql=copy_statement, file=buffer)
175+
174176
return True
175177

176178
def upsert(

0 commit comments

Comments
 (0)