Skip to content

Commit edbfad7

Browse files
feat: Users can now opt-in into using COPY to load data (#483)
- **Use Psycopg3 COPY** - **Update pyproject.toml** - **Refactor COPY into a method** --------- Co-authored-by: SpaceCondor <conner267@live.com> Co-authored-by: Conner Panarella <connerp32@gmail.com>
1 parent 2e73a09 commit edbfad7

File tree

12 files changed

+122
-34
lines changed

12 files changed

+122
-34
lines changed

.github/workflows/ci_workflow.yml

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ env:
2929

3030
jobs:
3131
tests:
32-
name: Python ${{ matrix.python-version }} / Postgres ${{ matrix.postgres-version }}
32+
name: Python ${{ matrix.python-version }} / Postgres ${{ matrix.postgres-version }} / ${{ matrix.use-copy == 'true' && 'COPY' || 'INSERT' }}
3333
runs-on: ubuntu-latest
3434
strategy:
3535
fail-fast: false
@@ -42,15 +42,24 @@ jobs:
4242
- "3.9"
4343
postgres-version:
4444
- "17"
45+
use-copy:
46+
- "true"
4547
include:
4648
- python-version: "3.13"
4749
postgres-version: "13"
50+
use-copy: "true"
4851
- python-version: "3.13"
4952
postgres-version: "14"
53+
use-copy: "true"
5054
- python-version: "3.13"
5155
postgres-version: "15"
56+
use-copy: "true"
5257
- python-version: "3.13"
5358
postgres-version: "16"
59+
use-copy: "true"
60+
- python-version: "3.13"
61+
postgres-version: "17"
62+
use-copy: "false"
5463
steps:
5564
- uses: actions/checkout@v4
5665
with:
@@ -78,14 +87,16 @@ jobs:
7887
python -m pip install --upgrade pip
7988
pipx install tox
8089
- name: Run pytest
90+
env:
91+
TARGET_POSTGRES_USE_COPY: ${{ matrix.use-copy }}
8192
run: |
8293
tox -e ${{ matrix.python-version }}
8394
- name: Run lint
8495
run: |
8596
tox -e lint
8697
8798
integration:
88-
name: Meltano integration test
99+
name: Meltano integration test / ${{ matrix.use-copy == 'true' && 'COPY' || 'INSERT' }}
89100
runs-on: ubuntu-latest
90101
services:
91102
postgres:
@@ -99,6 +110,10 @@ jobs:
99110
--health-retries 5
100111
ports:
101112
- 5432:5432
113+
strategy:
114+
fail-fast: false
115+
matrix:
116+
use-copy: ["true", "false"]
102117
steps:
103118
- uses: actions/checkout@v4
104119
- name: Set up Python
@@ -113,4 +128,6 @@ jobs:
113128
pipx install meltano
114129
meltano --version
115130
- name: smoke-test-tap
131+
env:
132+
TARGET_POSTGRES_USE_COPY: ${{ matrix.use-copy }}
116133
run: meltano run tap-smoke-test target-postgres

.pre-commit-config.yaml

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,3 @@ repos:
2323
- id: ruff
2424
args: [--fix]
2525
- id: ruff-format
26-
27-
- repo: https://github.com/pre-commit/mirrors-mypy
28-
rev: 'v1.13.0'
29-
hooks:
30-
- id: mypy
31-
exclude: tests
32-
additional_dependencies:
33-
- types-paramiko
34-
- types-simplejson
35-
- types-sqlalchemy
36-
- types-jsonschema

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ This target is tested with all actively supported [Python](https://devguide.pyth
3131
| user | False | None | User name used to authenticate. |
3232
| password | False | None | Password used to authenticate. |
3333
| database | False | None | Database name. |
34+
| use_copy | False | None | Use the COPY command to insert data. This is usually faster than INSERT statements. This option is only available for the postgres+psycopg dialect+driver combination. |
3435
| default_target_schema | False | melty | Postgres schema to send data to, example: tap-clickup |
3536
| activate_version | False | 1 | If set to false, the tap will ignore activate version messages. If set to true, add_record_metadata must be set to true as well. |
3637
| hard_delete | False | 0 | When activate version is sent from a tap this specefies if we should delete the records that don't match, or mark them with a date in the `_sdc_deleted_at` column. This config option is ignored if `activate_version` is set to false. |

meltano.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ plugins:
5959
kind: integer
6060
- name: user
6161
- name: database
62+
- name: use_copy
63+
kind: boolean
6264
- name: target_schema
6365
- name: add_record_metadata
6466
kind: boolean

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ types-jsonschema = ">=4.19.0.3"
5858

5959
[tool.mypy]
6060
exclude = "tests"
61+
warn_redundant_casts = true
62+
warn_unused_configs = true
63+
warn_unused_ignores = true
6164

6265
[[tool.mypy.overrides]]
6366
module = ["sshtunnel"]

target_postgres/connector.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -756,7 +756,7 @@ def guess_key_type(self, key_data: str) -> paramiko.PKey:
756756
paramiko.Ed25519Key,
757757
):
758758
try:
759-
key = key_class.from_private_key(io.StringIO(key_data)) # type: ignore[attr-defined]
759+
key = key_class.from_private_key(io.StringIO(key_data))
760760
except paramiko.SSHException: # noqa: PERF203
761761
continue
762762
else:

target_postgres/sinks.py

Lines changed: 74 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,57 @@ def generate_temp_table_name(self):
119119
# in postgres, used a guid just in case we are using the same session
120120
return f"{str(uuid.uuid4()).replace('-', '_')}"
121121

122+
def generate_copy_statement(
123+
self,
124+
full_table_name: str | FullyQualifiedName,
125+
columns: list[sa.Column],
126+
) -> str:
127+
"""Generate a copy statement for bulk copy.
128+
129+
Args:
130+
full_table_name: the target table name.
131+
columns: the target table columns.
132+
133+
Returns:
134+
A copy statement.
135+
"""
136+
columns_list = ", ".join(f'"{column.name}"' for column in columns)
137+
sql: str = f'COPY "{full_table_name}" ({columns_list}) FROM STDIN'
138+
139+
return sql
140+
141+
def _do_copy(
142+
self,
143+
connection: sa.engine.Connection,
144+
copy_statement: str,
145+
columns: list[sa.Column],
146+
data_to_copy: list[dict[str, t.Any]],
147+
) -> None:
148+
# Prepare to process the rows into csv. Use each column's bind_processor to do
149+
# most of the work, then do the final construction of the csv rows ourselves
150+
# to control exactly how values are converted and which ones are quoted.
151+
column_bind_processors = {
152+
column.name: column.type.bind_processor(connection.dialect)
153+
for column in columns
154+
}
155+
156+
# Use copy to run the copy statement.
157+
# https://www.psycopg.org/psycopg3/docs/basic/copy.html
158+
with connection.connection.cursor().copy(copy_statement) as copy: # type: ignore[attr-defined]
159+
for row in data_to_copy:
160+
processed_row = []
161+
for row_column_name in row:
162+
if column_bind_processors[row_column_name] is not None:
163+
processed_row.append(
164+
column_bind_processors[row_column_name](
165+
row[row_column_name]
166+
)
167+
)
168+
else:
169+
processed_row.append(row[row_column_name])
170+
171+
copy.write_row(processed_row)
172+
122173
def bulk_insert_records( # type: ignore[override]
123174
self,
124175
table: sa.Table,
@@ -145,35 +196,43 @@ def bulk_insert_records( # type: ignore[override]
145196
True if table exists, False if not, None if unsure or undetectable.
146197
"""
147198
columns = self.column_representation(schema)
148-
insert: str = t.cast(
149-
str,
150-
self.generate_insert_statement(
151-
table.name,
152-
columns,
153-
),
154-
)
155-
self.logger.info("Inserting with SQL: %s", insert)
156-
# Only one record per PK, we want to take the last one
157-
data_to_insert: list[dict[str, t.Any]] = []
158199

200+
data: list[dict[str, t.Any]] = []
201+
202+
# If append only is False, we only take the latest record one per primary key
159203
if self.append_only is False:
160-
insert_records: dict[tuple, dict] = {} # pk tuple: record
204+
unique_records: dict[tuple, dict] = {} # pk tuple: values
161205
for record in records:
162206
insert_record = {
163207
column.name: record.get(column.name) for column in columns
164208
}
165209
# No need to check for a KeyError here because the SDK already
166210
# guarantees that all key properties exist in the record.
167211
primary_key_tuple = tuple(record[key] for key in primary_keys)
168-
insert_records[primary_key_tuple] = insert_record
169-
data_to_insert = list(insert_records.values())
212+
unique_records[primary_key_tuple] = insert_record
213+
data = list(unique_records.values())
170214
else:
171215
for record in records:
172216
insert_record = {
173217
column.name: record.get(column.name) for column in columns
174218
}
175-
data_to_insert.append(insert_record)
176-
connection.execute(insert, data_to_insert)
219+
data.append(insert_record)
220+
221+
if self.config["use_copy"]:
222+
copy_statement: str = self.generate_copy_statement(table.name, columns)
223+
self.logger.info("Inserting with SQL: %s", copy_statement)
224+
self._do_copy(connection, copy_statement, columns, data)
225+
else:
226+
insert: str = t.cast(
227+
str,
228+
self.generate_insert_statement(
229+
table.name,
230+
columns,
231+
),
232+
)
233+
self.logger.info("Inserting with SQL: %s", insert)
234+
connection.execute(insert, data)
235+
177236
return True
178237

179238
def upsert(

target_postgres/target.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,17 @@ def __init__(
138138
th.StringType,
139139
description="Database name.",
140140
),
141+
th.Property(
142+
"use_copy",
143+
th.BooleanType,
144+
default=False,
145+
description=(
146+
"Use the COPY command to insert data. This is usually faster than "
147+
f"INSERT statements. This option is only available for the {PSYCOPG3} "
148+
"dialect+driver."
149+
),
150+
title="Use COPY",
151+
),
141152
th.Property(
142153
"sqlalchemy_url",
143154
th.StringType,

target_postgres/tests/conftest.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
"""Configuration for pytest."""
2+
3+
import os
4+
5+
6+
def pytest_report_header():
7+
"""Add environment variables to the pytest report header."""
8+
return [f"{var}: value" for var in os.environ if var.startswith("TARGET_POSTGRES")]

target_postgres/tests/test_sdk.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,18 @@
1010
TargetCamelcaseTest,
1111
TargetCliPrintsTest,
1212
TargetDuplicateRecords,
13-
TargetEncodedStringData,
1413
TargetInvalidSchemaTest,
15-
TargetMultipleStateMessages,
1614
TargetNoPrimaryKeys,
1715
TargetOptionalAttributes,
1816
TargetRecordBeforeSchemaTest,
1917
TargetRecordMissingKeyProperty,
2018
TargetRecordMissingOptionalFields,
21-
TargetRecordMissingRequiredProperty,
2219
TargetSchemaNoProperties,
2320
TargetSchemaUpdates,
2421
TargetSpecialCharsInAttributes,
2522
)
2623

2724
from target_postgres.target import TargetPostgres
28-
2925
from .core import create_engine, postgres_config
3026

3127
target_tests = TestSuite(

0 commit comments

Comments
 (0)