diff --git a/.github/workflows/ci_workflow.yml b/.github/workflows/ci_workflow.yml index 90b0975..8a9f2ee 100644 --- a/.github/workflows/ci_workflow.yml +++ b/.github/workflows/ci_workflow.yml @@ -29,7 +29,7 @@ env: jobs: tests: - name: Python ${{ matrix.python-version }} / Postgres ${{ matrix.postgres-version }} + name: Python ${{ matrix.python-version }} / Postgres ${{ matrix.postgres-version }} / ${{ matrix.use-copy == 'true' && 'COPY' || 'INSERT' }} runs-on: ubuntu-latest strategy: fail-fast: false @@ -42,15 +42,24 @@ jobs: - "3.9" postgres-version: - "17" + use-copy: + - "true" include: - python-version: "3.13" postgres-version: "13" + use-copy: "true" - python-version: "3.13" postgres-version: "14" + use-copy: "true" - python-version: "3.13" postgres-version: "15" + use-copy: "true" - python-version: "3.13" postgres-version: "16" + use-copy: "true" + - python-version: "3.13" + postgres-version: "17" + use-copy: "false" steps: - uses: actions/checkout@v4 with: @@ -78,6 +87,8 @@ jobs: python -m pip install --upgrade pip pipx install tox - name: Run pytest + env: + TARGET_POSTGRES_USE_COPY: ${{ matrix.use-copy }} run: | tox -e ${{ matrix.python-version }} - name: Run lint @@ -85,7 +96,7 @@ jobs: tox -e lint integration: - name: Meltano integration test + name: Meltano integration test / ${{ matrix.use-copy == 'true' && 'COPY' || 'INSERT' }} runs-on: ubuntu-latest services: postgres: @@ -99,6 +110,10 @@ jobs: --health-retries 5 ports: - 5432:5432 + strategy: + fail-fast: false + matrix: + use-copy: ["true", "false"] steps: - uses: actions/checkout@v4 - name: Set up Python @@ -113,4 +128,6 @@ jobs: pipx install meltano meltano --version - name: smoke-test-tap + env: + TARGET_POSTGRES_USE_COPY: ${{ matrix.use-copy }} run: meltano run tap-smoke-test target-postgres diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 5d0da2f..17ee9f3 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -23,14 +23,3 @@ repos: - id: ruff args: [--fix] - id: ruff-format - -- repo: https://github.com/pre-commit/mirrors-mypy - rev: 'v1.13.0' - hooks: - - id: mypy - exclude: tests - additional_dependencies: - - types-paramiko - - types-simplejson - - types-sqlalchemy - - types-jsonschema diff --git a/README.md b/README.md index 4880900..9e6ab68 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,7 @@ This target is tested with all actively supported [Python](https://devguide.pyth | user | False | None | User name used to authenticate. | | password | False | None | Password used to authenticate. | | database | False | None | Database name. | +| use_copy | False | None | Use the COPY command to insert data. This is usually faster than INSERT statements. This option is only available for the postgres+psycopg dialect+driver combination. | | default_target_schema | False | melty | Postgres schema to send data to, example: tap-clickup | | activate_version | False | 1 | If set to false, the tap will ignore activate version messages. If set to true, add_record_metadata must be set to true as well. | | hard_delete | False | 0 | When activate version is sent from a tap this specefies if we should delete the records that don't match, or mark them with a date in the `_sdc_deleted_at` column. This config option is ignored if `activate_version` is set to false. | diff --git a/meltano.yml b/meltano.yml index 2c31ed8..4d77006 100644 --- a/meltano.yml +++ b/meltano.yml @@ -59,6 +59,8 @@ plugins: kind: integer - name: user - name: database + - name: use_copy + kind: boolean - name: target_schema - name: add_record_metadata kind: boolean diff --git a/pyproject.toml b/pyproject.toml index fb439da..d2144ad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,6 +58,9 @@ types-jsonschema = ">=4.19.0.3" [tool.mypy] exclude = "tests" +warn_redundant_casts = true +warn_unused_configs = true +warn_unused_ignores = true [[tool.mypy.overrides]] module = ["sshtunnel"] diff --git a/target_postgres/connector.py b/target_postgres/connector.py index 28713e8..f627876 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -756,7 +756,7 @@ def guess_key_type(self, key_data: str) -> paramiko.PKey: paramiko.Ed25519Key, ): try: - key = key_class.from_private_key(io.StringIO(key_data)) # type: ignore[attr-defined] + key = key_class.from_private_key(io.StringIO(key_data)) except paramiko.SSHException: # noqa: PERF203 continue else: diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index f2f9e42..d6959cd 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -119,6 +119,57 @@ def generate_temp_table_name(self): # in postgres, used a guid just in case we are using the same session return f"{str(uuid.uuid4()).replace('-', '_')}" + def generate_copy_statement( + self, + full_table_name: str | FullyQualifiedName, + columns: list[sa.Column], + ) -> str: + """Generate a copy statement for bulk copy. + + Args: + full_table_name: the target table name. + columns: the target table columns. + + Returns: + A copy statement. + """ + columns_list = ", ".join(f'"{column.name}"' for column in columns) + sql: str = f'COPY "{full_table_name}" ({columns_list}) FROM STDIN' + + return sql + + def _do_copy( + self, + connection: sa.engine.Connection, + copy_statement: str, + columns: list[sa.Column], + data_to_copy: list[dict[str, t.Any]], + ) -> None: + # Prepare to process the rows into csv. Use each column's bind_processor to do + # most of the work, then do the final construction of the csv rows ourselves + # to control exactly how values are converted and which ones are quoted. + column_bind_processors = { + column.name: column.type.bind_processor(connection.dialect) + for column in columns + } + + # Use copy to run the copy statement. + # https://www.psycopg.org/psycopg3/docs/basic/copy.html + with connection.connection.cursor().copy(copy_statement) as copy: # type: ignore[attr-defined] + for row in data_to_copy: + processed_row = [] + for row_column_name in row: + if column_bind_processors[row_column_name] is not None: + processed_row.append( + column_bind_processors[row_column_name]( + row[row_column_name] + ) + ) + else: + processed_row.append(row[row_column_name]) + + copy.write_row(processed_row) + def bulk_insert_records( # type: ignore[override] self, table: sa.Table, @@ -145,19 +196,12 @@ def bulk_insert_records( # type: ignore[override] True if table exists, False if not, None if unsure or undetectable. """ columns = self.column_representation(schema) - insert: str = t.cast( - str, - self.generate_insert_statement( - table.name, - columns, - ), - ) - self.logger.info("Inserting with SQL: %s", insert) - # Only one record per PK, we want to take the last one - data_to_insert: list[dict[str, t.Any]] = [] + data: list[dict[str, t.Any]] = [] + + # If append only is False, we only take the latest record one per primary key if self.append_only is False: - insert_records: dict[tuple, dict] = {} # pk tuple: record + unique_records: dict[tuple, dict] = {} # pk tuple: values for record in records: insert_record = { column.name: record.get(column.name) for column in columns @@ -165,15 +209,30 @@ def bulk_insert_records( # type: ignore[override] # No need to check for a KeyError here because the SDK already # guarantees that all key properties exist in the record. primary_key_tuple = tuple(record[key] for key in primary_keys) - insert_records[primary_key_tuple] = insert_record - data_to_insert = list(insert_records.values()) + unique_records[primary_key_tuple] = insert_record + data = list(unique_records.values()) else: for record in records: insert_record = { column.name: record.get(column.name) for column in columns } - data_to_insert.append(insert_record) - connection.execute(insert, data_to_insert) + data.append(insert_record) + + if self.config["use_copy"]: + copy_statement: str = self.generate_copy_statement(table.name, columns) + self.logger.info("Inserting with SQL: %s", copy_statement) + self._do_copy(connection, copy_statement, columns, data) + else: + insert: str = t.cast( + str, + self.generate_insert_statement( + table.name, + columns, + ), + ) + self.logger.info("Inserting with SQL: %s", insert) + connection.execute(insert, data) + return True def upsert( diff --git a/target_postgres/target.py b/target_postgres/target.py index d2830da..1d4bf5a 100644 --- a/target_postgres/target.py +++ b/target_postgres/target.py @@ -138,6 +138,17 @@ def __init__( th.StringType, description="Database name.", ), + th.Property( + "use_copy", + th.BooleanType, + default=False, + description=( + "Use the COPY command to insert data. This is usually faster than " + f"INSERT statements. This option is only available for the {PSYCOPG3} " + "dialect+driver." + ), + title="Use COPY", + ), th.Property( "sqlalchemy_url", th.StringType, diff --git a/target_postgres/tests/conftest.py b/target_postgres/tests/conftest.py new file mode 100644 index 0000000..55450d9 --- /dev/null +++ b/target_postgres/tests/conftest.py @@ -0,0 +1,8 @@ +"""Configuration for pytest.""" + +import os + + +def pytest_report_header(): + """Add environment variables to the pytest report header.""" + return [f"{var}: value" for var in os.environ if var.startswith("TARGET_POSTGRES")] diff --git a/target_postgres/tests/test_sdk.py b/target_postgres/tests/test_sdk.py index 34e17da..07ceb49 100644 --- a/target_postgres/tests/test_sdk.py +++ b/target_postgres/tests/test_sdk.py @@ -10,22 +10,18 @@ TargetCamelcaseTest, TargetCliPrintsTest, TargetDuplicateRecords, - TargetEncodedStringData, TargetInvalidSchemaTest, - TargetMultipleStateMessages, TargetNoPrimaryKeys, TargetOptionalAttributes, TargetRecordBeforeSchemaTest, TargetRecordMissingKeyProperty, TargetRecordMissingOptionalFields, - TargetRecordMissingRequiredProperty, TargetSchemaNoProperties, TargetSchemaUpdates, TargetSpecialCharsInAttributes, ) from target_postgres.target import TargetPostgres - from .core import create_engine, postgres_config target_tests = TestSuite( diff --git a/target_postgres/tests/test_target_postgres.py b/target_postgres/tests/test_target_postgres.py index 8e4a79e..ebeccce 100644 --- a/target_postgres/tests/test_target_postgres.py +++ b/target_postgres/tests/test_target_postgres.py @@ -158,7 +158,7 @@ def test_sqlalchemy_url_config(postgres_config_no_ssl): port = postgres_config_no_ssl["port"] config = { - "sqlalchemy_url": f"postgresql://{user}:{password}@{host}:{port}/{database}" + "sqlalchemy_url": f"postgresql+psycopg://{user}:{password}@{host}:{port}/{database}" } tap = SampleTapCountries(config={}, state=None) target = TargetPostgres(config=config) diff --git a/tox.ini b/tox.ini index fa97c20..741be65 100644 --- a/tox.ini +++ b/tox.ini @@ -12,6 +12,8 @@ min_version = 4 # Run the python tests. # To execute, run `tox -e 3.13` envlist = 3.{9,10,11,12,13} +pass_env = + TARGET_POSTGRES_* deps = pytest commands =