Skip to content

feat: Users can now opt-in into using COPY to load data #483

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions .github/workflows/ci_workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ env:

jobs:
tests:
name: Python ${{ matrix.python-version }} / Postgres ${{ matrix.postgres-version }}
name: Python ${{ matrix.python-version }} / Postgres ${{ matrix.postgres-version }} / ${{ matrix.use-copy == 'true' && 'COPY' || 'INSERT' }}
runs-on: ubuntu-latest
strategy:
fail-fast: false
Expand All @@ -42,15 +42,24 @@ jobs:
- "3.9"
postgres-version:
- "17"
use-copy:
- "true"
include:
- python-version: "3.13"
postgres-version: "13"
use-copy: "true"
- python-version: "3.13"
postgres-version: "14"
use-copy: "true"
- python-version: "3.13"
postgres-version: "15"
use-copy: "true"
- python-version: "3.13"
postgres-version: "16"
use-copy: "true"
- python-version: "3.13"
postgres-version: "17"
use-copy: "false"
steps:
- uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -78,14 +87,16 @@ jobs:
python -m pip install --upgrade pip
pipx install tox
- name: Run pytest
env:
TARGET_POSTGRES_USE_COPY: ${{ matrix.use-copy }}
run: |
tox -e ${{ matrix.python-version }}
- name: Run lint
run: |
tox -e lint

integration:
name: Meltano integration test
name: Meltano integration test / ${{ matrix.use-copy == 'true' && 'COPY' || 'INSERT' }}
runs-on: ubuntu-latest
services:
postgres:
Expand All @@ -99,6 +110,10 @@ jobs:
--health-retries 5
ports:
- 5432:5432
strategy:
fail-fast: false
matrix:
use-copy: ["true", "false"]
steps:
- uses: actions/checkout@v4
- name: Set up Python
Expand All @@ -113,4 +128,6 @@ jobs:
pipx install meltano
meltano --version
- name: smoke-test-tap
env:
TARGET_POSTGRES_USE_COPY: ${{ matrix.use-copy }}
run: meltano run tap-smoke-test target-postgres
11 changes: 0 additions & 11 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,3 @@ repos:
- id: ruff
args: [--fix]
- id: ruff-format

- repo: https://github.com/pre-commit/mirrors-mypy
rev: 'v1.13.0'
hooks:
- id: mypy
exclude: tests
additional_dependencies:
- types-paramiko
- types-simplejson
- types-sqlalchemy
- types-jsonschema
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ This target is tested with all actively supported [Python](https://devguide.pyth
| user | False | None | User name used to authenticate. |
| password | False | None | Password used to authenticate. |
| database | False | None | Database name. |
| use_copy | False | None | Use the COPY command to insert data. This is usually faster than INSERT statements. This option is only available for the postgres+psycopg dialect+driver combination. |
| default_target_schema | False | melty | Postgres schema to send data to, example: tap-clickup |
| activate_version | False | 1 | If set to false, the tap will ignore activate version messages. If set to true, add_record_metadata must be set to true as well. |
| hard_delete | False | 0 | When activate version is sent from a tap this specefies if we should delete the records that don't match, or mark them with a date in the `_sdc_deleted_at` column. This config option is ignored if `activate_version` is set to false. |
Expand Down
2 changes: 2 additions & 0 deletions meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ plugins:
kind: integer
- name: user
- name: database
- name: use_copy
kind: boolean
- name: target_schema
- name: add_record_metadata
kind: boolean
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ types-jsonschema = ">=4.19.0.3"

[tool.mypy]
exclude = "tests"
warn_redundant_casts = true
warn_unused_configs = true
warn_unused_ignores = true

[[tool.mypy.overrides]]
module = ["sshtunnel"]
Expand Down
2 changes: 1 addition & 1 deletion target_postgres/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ def guess_key_type(self, key_data: str) -> paramiko.PKey:
paramiko.Ed25519Key,
):
try:
key = key_class.from_private_key(io.StringIO(key_data)) # type: ignore[attr-defined]
key = key_class.from_private_key(io.StringIO(key_data))
except paramiko.SSHException: # noqa: PERF203
continue
else:
Expand Down
89 changes: 74 additions & 15 deletions target_postgres/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,57 @@ def generate_temp_table_name(self):
# in postgres, used a guid just in case we are using the same session
return f"{str(uuid.uuid4()).replace('-', '_')}"

def generate_copy_statement(
self,
full_table_name: str | FullyQualifiedName,
columns: list[sa.Column],
) -> str:
"""Generate a copy statement for bulk copy.

Args:
full_table_name: the target table name.
columns: the target table columns.

Returns:
A copy statement.
"""
columns_list = ", ".join(f'"{column.name}"' for column in columns)
sql: str = f'COPY "{full_table_name}" ({columns_list}) FROM STDIN'

return sql

def _do_copy(
self,
connection: sa.engine.Connection,
copy_statement: str,
columns: list[sa.Column],
data_to_copy: list[dict[str, t.Any]],
) -> None:
# Prepare to process the rows into csv. Use each column's bind_processor to do
# most of the work, then do the final construction of the csv rows ourselves
# to control exactly how values are converted and which ones are quoted.
column_bind_processors = {
column.name: column.type.bind_processor(connection.dialect)
for column in columns
}

# Use copy to run the copy statement.
# https://www.psycopg.org/psycopg3/docs/basic/copy.html
with connection.connection.cursor().copy(copy_statement) as copy: # type: ignore[attr-defined]
for row in data_to_copy:
processed_row = []
for row_column_name in row:
if column_bind_processors[row_column_name] is not None:
processed_row.append(
column_bind_processors[row_column_name](
row[row_column_name]
)
)
else:
processed_row.append(row[row_column_name])

copy.write_row(processed_row)

def bulk_insert_records( # type: ignore[override]
self,
table: sa.Table,
Expand All @@ -145,35 +196,43 @@ def bulk_insert_records( # type: ignore[override]
True if table exists, False if not, None if unsure or undetectable.
"""
columns = self.column_representation(schema)
insert: str = t.cast(
str,
self.generate_insert_statement(
table.name,
columns,
),
)
self.logger.info("Inserting with SQL: %s", insert)
# Only one record per PK, we want to take the last one
data_to_insert: list[dict[str, t.Any]] = []

data: list[dict[str, t.Any]] = []

# If append only is False, we only take the latest record one per primary key
if self.append_only is False:
insert_records: dict[tuple, dict] = {} # pk tuple: record
unique_records: dict[tuple, dict] = {} # pk tuple: values
for record in records:
insert_record = {
column.name: record.get(column.name) for column in columns
}
# No need to check for a KeyError here because the SDK already
# guarantees that all key properties exist in the record.
primary_key_tuple = tuple(record[key] for key in primary_keys)
insert_records[primary_key_tuple] = insert_record
data_to_insert = list(insert_records.values())
unique_records[primary_key_tuple] = insert_record
data = list(unique_records.values())
else:
for record in records:
insert_record = {
column.name: record.get(column.name) for column in columns
}
data_to_insert.append(insert_record)
connection.execute(insert, data_to_insert)
data.append(insert_record)

if self.config["use_copy"]:
copy_statement: str = self.generate_copy_statement(table.name, columns)
self.logger.info("Inserting with SQL: %s", copy_statement)
self._do_copy(connection, copy_statement, columns, data)
else:
insert: str = t.cast(
str,
self.generate_insert_statement(
table.name,
columns,
),
)
self.logger.info("Inserting with SQL: %s", insert)
connection.execute(insert, data)

return True

def upsert(
Expand Down
11 changes: 11 additions & 0 deletions target_postgres/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,17 @@ def __init__(
th.StringType,
description="Database name.",
),
th.Property(
"use_copy",
th.BooleanType,
default=False,
description=(
"Use the COPY command to insert data. This is usually faster than "
f"INSERT statements. This option is only available for the {PSYCOPG3} "
"dialect+driver."
),
title="Use COPY",
),
th.Property(
"sqlalchemy_url",
th.StringType,
Expand Down
8 changes: 8 additions & 0 deletions target_postgres/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""Configuration for pytest."""

import os


def pytest_report_header():
"""Add environment variables to the pytest report header."""
return [f"{var}: value" for var in os.environ if var.startswith("TARGET_POSTGRES")]
4 changes: 0 additions & 4 deletions target_postgres/tests/test_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,18 @@
TargetCamelcaseTest,
TargetCliPrintsTest,
TargetDuplicateRecords,
TargetEncodedStringData,
TargetInvalidSchemaTest,
TargetMultipleStateMessages,
TargetNoPrimaryKeys,
TargetOptionalAttributes,
TargetRecordBeforeSchemaTest,
TargetRecordMissingKeyProperty,
TargetRecordMissingOptionalFields,
TargetRecordMissingRequiredProperty,
TargetSchemaNoProperties,
TargetSchemaUpdates,
TargetSpecialCharsInAttributes,
)

from target_postgres.target import TargetPostgres

from .core import create_engine, postgres_config

target_tests = TestSuite(
Expand Down
2 changes: 1 addition & 1 deletion target_postgres/tests/test_target_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def test_sqlalchemy_url_config(postgres_config_no_ssl):
port = postgres_config_no_ssl["port"]

config = {
"sqlalchemy_url": f"postgresql://{user}:{password}@{host}:{port}/{database}"
"sqlalchemy_url": f"postgresql+psycopg://{user}:{password}@{host}:{port}/{database}"
}
tap = SampleTapCountries(config={}, state=None)
target = TargetPostgres(config=config)
Expand Down
2 changes: 2 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ min_version = 4
# Run the python tests.
# To execute, run `tox -e 3.13`
envlist = 3.{9,10,11,12,13}
pass_env =
TARGET_POSTGRES_*
deps =
pytest
commands =
Expand Down