Skip to content

Commit 0761055

Browse files
edgarrmondragonSpaceCondor
authored andcommitted
feat: Users can now opt-in into using COPY to load data (MeltanoLabs#483)
- **Use Psycopg3 COPY** - **Update pyproject.toml** - **Refactor COPY into a method** --------- Co-authored-by: SpaceCondor <conner267@live.com> Co-authored-by: Conner Panarella <connerp32@gmail.com>
1 parent 1ff054b commit 0761055

File tree

12 files changed

+121
-34
lines changed

12 files changed

+121
-34
lines changed

.github/workflows/ci_workflow.yml

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ env:
2929

3030
jobs:
3131
tests:
32-
name: Python ${{ matrix.python-version }} / Postgres ${{ matrix.postgres-version }}
32+
name: Python ${{ matrix.python-version }} / Postgres ${{ matrix.postgres-version }} / ${{ matrix.use-copy == 'true' && 'COPY' || 'INSERT' }}
3333
runs-on: ubuntu-latest
3434
strategy:
3535
fail-fast: false
@@ -42,15 +42,24 @@ jobs:
4242
- "3.9"
4343
postgres-version:
4444
- "17"
45+
use-copy:
46+
- "true"
4547
include:
4648
- python-version: "3.13"
4749
postgres-version: "13"
50+
use-copy: "true"
4851
- python-version: "3.13"
4952
postgres-version: "14"
53+
use-copy: "true"
5054
- python-version: "3.13"
5155
postgres-version: "15"
56+
use-copy: "true"
5257
- python-version: "3.13"
5358
postgres-version: "16"
59+
use-copy: "true"
60+
- python-version: "3.13"
61+
postgres-version: "17"
62+
use-copy: "false"
5463
steps:
5564
- uses: actions/checkout@v4
5665
with:
@@ -78,14 +87,16 @@ jobs:
7887
python -m pip install --upgrade pip
7988
pipx install tox
8089
- name: Run pytest
90+
env:
91+
TARGET_POSTGRES_USE_COPY: ${{ matrix.use-copy }}
8192
run: |
8293
tox -e ${{ matrix.python-version }}
8394
- name: Run lint
8495
run: |
8596
tox -e lint
8697
8798
integration:
88-
name: Meltano integration test
99+
name: Meltano integration test / ${{ matrix.use-copy == 'true' && 'COPY' || 'INSERT' }}
89100
runs-on: ubuntu-latest
90101
services:
91102
postgres:
@@ -99,6 +110,10 @@ jobs:
99110
--health-retries 5
100111
ports:
101112
- 5432:5432
113+
strategy:
114+
fail-fast: false
115+
matrix:
116+
use-copy: ["true", "false"]
102117
steps:
103118
- uses: actions/checkout@v4
104119
- name: Set up Python
@@ -113,4 +128,6 @@ jobs:
113128
pipx install meltano
114129
meltano --version
115130
- name: smoke-test-tap
131+
env:
132+
TARGET_POSTGRES_USE_COPY: ${{ matrix.use-copy }}
116133
run: meltano run tap-smoke-test target-postgres

.pre-commit-config.yaml

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,3 @@ repos:
2323
- id: ruff
2424
args: [--fix]
2525
- id: ruff-format
26-
27-
- repo: https://github.com/pre-commit/mirrors-mypy
28-
rev: 'v1.13.0'
29-
hooks:
30-
- id: mypy
31-
exclude: tests
32-
additional_dependencies:
33-
- types-paramiko
34-
- types-simplejson
35-
- types-sqlalchemy
36-
- types-jsonschema

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ This target is tested with all actively supported [Python](https://devguide.pyth
3131
| user | False | None | User name used to authenticate. Note if sqlalchemy_url is set this will be ignored. |
3232
| password | False | None | Password used to authenticate. Note if sqlalchemy_url is set this will be ignored. |
3333
| database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. |
34+
| use_copy | False | None | Use the COPY command to insert data. This is usually faster than INSERT statements. This option is only available for the postgres+psycopg dialect+driver combination. |
3435
| sqlalchemy_url | False | None | SQLAlchemy connection string. This will override using host, user, password, port, dialect, and all ssl settings. Note that you must escape password special characters properly. See https://docs.sqlalchemy.org/en/20/core/engines.html#escaping-special-characters-such-as-signs-in-passwords |
3536
| dialect+driver | False | postgresql+psycopg2 | Dialect+driver see https://docs.sqlalchemy.org/en/20/core/engines.html. Generally just leave this alone. Note if sqlalchemy_url is set this will be ignored. |
3637
| default_target_schema | False | melty | Postgres schema to send data to, example: tap-clickup |

meltano.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ plugins:
5959
kind: integer
6060
- name: user
6161
- name: database
62+
- name: use_copy
63+
kind: boolean
6264
- name: target_schema
6365
- name: add_record_metadata
6466
kind: boolean

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ types-jsonschema = ">=4.19.0.3"
5858

5959
[tool.mypy]
6060
exclude = "tests"
61+
warn_redundant_casts = true
62+
warn_unused_configs = true
63+
warn_unused_ignores = true
6164

6265
[[tool.mypy.overrides]]
6366
module = ["sshtunnel"]

target_postgres/connector.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -765,7 +765,7 @@ def guess_key_type(self, key_data: str) -> paramiko.PKey:
765765
paramiko.Ed25519Key,
766766
):
767767
try:
768-
key = key_class.from_private_key(io.StringIO(key_data)) # type: ignore[attr-defined]
768+
key = key_class.from_private_key(io.StringIO(key_data))
769769
except paramiko.SSHException: # noqa: PERF203
770770
continue
771771
else:

target_postgres/sinks.py

Lines changed: 73 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,56 @@ def replace_null_character(d):
139139
data[i] = replace_null_character(data[i])
140140

141141
return data
142+
def generate_copy_statement(
143+
self,
144+
full_table_name: str | FullyQualifiedName,
145+
columns: list[sa.Column],
146+
) -> str:
147+
"""Generate a copy statement for bulk copy.
148+
149+
Args:
150+
full_table_name: the target table name.
151+
columns: the target table columns.
152+
153+
Returns:
154+
A copy statement.
155+
"""
156+
columns_list = ", ".join(f'"{column.name}"' for column in columns)
157+
sql: str = f'COPY "{full_table_name}" ({columns_list}) FROM STDIN'
158+
159+
return sql
160+
161+
def _do_copy(
162+
self,
163+
connection: sa.engine.Connection,
164+
copy_statement: str,
165+
columns: list[sa.Column],
166+
data_to_copy: list[dict[str, t.Any]],
167+
) -> None:
168+
# Prepare to process the rows into csv. Use each column's bind_processor to do
169+
# most of the work, then do the final construction of the csv rows ourselves
170+
# to control exactly how values are converted and which ones are quoted.
171+
column_bind_processors = {
172+
column.name: column.type.bind_processor(connection.dialect)
173+
for column in columns
174+
}
175+
176+
# Use copy to run the copy statement.
177+
# https://www.psycopg.org/psycopg3/docs/basic/copy.html
178+
with connection.connection.cursor().copy(copy_statement) as copy: # type: ignore[attr-defined]
179+
for row in data_to_copy:
180+
processed_row = []
181+
for row_column_name in row:
182+
if column_bind_processors[row_column_name] is not None:
183+
processed_row.append(
184+
column_bind_processors[row_column_name](
185+
row[row_column_name]
186+
)
187+
)
188+
else:
189+
processed_row.append(row[row_column_name])
190+
191+
copy.write_row(processed_row)
142192

143193
def bulk_insert_records( # type: ignore[override]
144194
self,
@@ -166,19 +216,12 @@ def bulk_insert_records( # type: ignore[override]
166216
True if table exists, False if not, None if unsure or undetectable.
167217
"""
168218
columns = self.column_representation(schema)
169-
insert: str = t.cast(
170-
str,
171-
self.generate_insert_statement(
172-
table.name,
173-
columns,
174-
),
175-
)
176-
self.logger.info("Inserting with SQL: %s", insert)
177-
# Only one record per PK, we want to take the last one
178-
data_to_insert: list[dict[str, t.Any]] = []
179219

220+
data: list[dict[str, t.Any]] = []
221+
222+
# If append only is False, we only take the latest record one per primary key
180223
if self.append_only is False:
181-
insert_records: dict[tuple, dict] = {} # pk tuple: record
224+
unique_records: dict[tuple, dict] = {} # pk tuple: values
182225
for record in records:
183226
insert_record = {
184227
column.name: (
@@ -191,8 +234,8 @@ def bulk_insert_records( # type: ignore[override]
191234
# No need to check for a KeyError here because the SDK already
192235
# guarantees that all key properties exist in the record.
193236
primary_key_tuple = tuple(record[key] for key in primary_keys)
194-
insert_records[primary_key_tuple] = insert_record
195-
data_to_insert = list(insert_records.values())
237+
unique_records[primary_key_tuple] = insert_record
238+
data = list(unique_records.values())
196239
else:
197240
for record in records:
198241
insert_record = {
@@ -203,8 +246,23 @@ def bulk_insert_records( # type: ignore[override]
203246
)
204247
for column in columns
205248
}
206-
data_to_insert.append(insert_record)
207-
connection.execute(insert, data_to_insert)
249+
data.append(insert_record)
250+
251+
if self.config["use_copy"]:
252+
copy_statement: str = self.generate_copy_statement(table.name, columns)
253+
self.logger.info("Inserting with SQL: %s", copy_statement)
254+
self._do_copy(connection, copy_statement, columns, data)
255+
else:
256+
insert: str = t.cast(
257+
str,
258+
self.generate_insert_statement(
259+
table.name,
260+
columns,
261+
),
262+
)
263+
self.logger.info("Inserting with SQL: %s", insert)
264+
connection.execute(insert, data)
265+
208266
return True
209267

210268
def upsert(

target_postgres/target.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,17 @@ def __init__(
138138
th.StringType,
139139
description="Database name.",
140140
),
141+
th.Property(
142+
"use_copy",
143+
th.BooleanType,
144+
default=False,
145+
description=(
146+
"Use the COPY command to insert data. This is usually faster than "
147+
f"INSERT statements. This option is only available for the {PSYCOPG3} "
148+
"dialect+driver."
149+
),
150+
title="Use COPY",
151+
),
141152
th.Property(
142153
"sqlalchemy_url",
143154
th.StringType,

target_postgres/tests/conftest.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
"""Configuration for pytest."""
2+
3+
import os
4+
5+
6+
def pytest_report_header():
7+
"""Add environment variables to the pytest report header."""
8+
return [f"{var}: value" for var in os.environ if var.startswith("TARGET_POSTGRES")]

target_postgres/tests/test_sdk.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,18 @@
1010
TargetCamelcaseTest,
1111
TargetCliPrintsTest,
1212
TargetDuplicateRecords,
13-
TargetEncodedStringData,
1413
TargetInvalidSchemaTest,
15-
TargetMultipleStateMessages,
1614
TargetNoPrimaryKeys,
1715
TargetOptionalAttributes,
1816
TargetRecordBeforeSchemaTest,
1917
TargetRecordMissingKeyProperty,
2018
TargetRecordMissingOptionalFields,
21-
TargetRecordMissingRequiredProperty,
2219
TargetSchemaNoProperties,
2320
TargetSchemaUpdates,
2421
TargetSpecialCharsInAttributes,
2522
)
2623

2724
from target_postgres.target import TargetPostgres
28-
2925
from .core import create_engine, postgres_config
3026

3127
target_tests = TestSuite(

0 commit comments

Comments
 (0)