diff --git a/.gitignore b/.gitignore index 56313116..4f8c62d2 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ smoke-test .meltano/** .tox/** .secrets/** +.idea .vscode/** output/** .env diff --git a/target_postgres/connector.py b/target_postgres/connector.py index d6730539..927d2ed4 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -12,24 +12,11 @@ import paramiko import simplejson -import sqlalchemy +import sqlalchemy as sa from singer_sdk import SQLConnector from singer_sdk import typing as th from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, JSONB -from sqlalchemy.engine import URL -from sqlalchemy.engine.url import make_url -from sqlalchemy.types import ( - BOOLEAN, - DATE, - DATETIME, - DECIMAL, - INTEGER, - TEXT, - TIME, - TIMESTAMP, - VARCHAR, - TypeDecorator, -) +from sqlalchemy.sql.type_api import TypeEngine from sshtunnel import SSHTunnelForwarder @@ -48,7 +35,7 @@ def __init__(self, config: dict) -> None: Args: config: Configuration for the connector. """ - url: URL = make_url(self.get_sqlalchemy_url(config=config)) + url: sa.engine.URL = sa.engine.make_url(self.get_sqlalchemy_url(config=config)) ssh_config = config.get("ssh_tunnel", {}) self.ssh_tunnel: SSHTunnelForwarder @@ -84,10 +71,10 @@ def prepare_table( # type: ignore[override] full_table_name: str, schema: dict, primary_keys: list[str], - connection: sqlalchemy.engine.Connection, + connection: sa.engine.Connection, partition_keys: list[str] | None = None, as_temp_table: bool = False, - ) -> sqlalchemy.Table: + ) -> sa.Table: """Adapt target table to provided schema if possible. Args: @@ -102,8 +89,8 @@ def prepare_table( # type: ignore[override] The table object. """ _, schema_name, table_name = self.parse_full_table_name(full_table_name) - meta = sqlalchemy.MetaData(schema=schema_name) - table: sqlalchemy.Table + meta = sa.MetaData(schema=schema_name) + table: sa.Table if not self.table_exists(full_table_name=full_table_name): table = self.create_empty_table( table_name=table_name, @@ -144,10 +131,10 @@ def prepare_table( # type: ignore[override] def copy_table_structure( self, full_table_name: str, - from_table: sqlalchemy.Table, - connection: sqlalchemy.engine.Connection, + from_table: sa.Table, + connection: sa.engine.Connection, as_temp_table: bool = False, - ) -> sqlalchemy.Table: + ) -> sa.Table: """Copy table structure. Args: @@ -160,58 +147,56 @@ def copy_table_structure( The new table object. """ _, schema_name, table_name = self.parse_full_table_name(full_table_name) - meta = sqlalchemy.MetaData(schema=schema_name) - new_table: sqlalchemy.Table + meta = sa.MetaData(schema=schema_name) + new_table: sa.Table columns = [] if self.table_exists(full_table_name=full_table_name): raise RuntimeError("Table already exists") for column in from_table.columns: columns.append(column._copy()) if as_temp_table: - new_table = sqlalchemy.Table( - table_name, meta, *columns, prefixes=["TEMPORARY"] - ) + new_table = sa.Table(table_name, meta, *columns, prefixes=["TEMPORARY"]) new_table.create(bind=connection) return new_table else: - new_table = sqlalchemy.Table(table_name, meta, *columns) + new_table = sa.Table(table_name, meta, *columns) new_table.create(bind=connection) return new_table @contextmanager - def _connect(self) -> t.Iterator[sqlalchemy.engine.Connection]: - with self._engine.connect().execution_options() as conn: + def _connect(self) -> t.Iterator[sa.engine.Connection]: + engine = self._engine + with engine.connect().execution_options() as conn: yield conn + engine.dispose() - def drop_table( - self, table: sqlalchemy.Table, connection: sqlalchemy.engine.Connection - ): + def drop_table(self, table: sa.Table, connection: sa.engine.Connection): """Drop table data.""" table.drop(bind=connection) def clone_table( self, new_table_name, table, metadata, connection, temp_table - ) -> sqlalchemy.Table: + ) -> sa.Table: """Clone a table.""" new_columns = [] for column in table.columns: new_columns.append( - sqlalchemy.Column( + sa.Column( column.name, column.type, ) ) if temp_table is True: - new_table = sqlalchemy.Table( + new_table = sa.Table( new_table_name, metadata, *new_columns, prefixes=["TEMPORARY"] ) else: - new_table = sqlalchemy.Table(new_table_name, metadata, *new_columns) + new_table = sa.Table(new_table_name, metadata, *new_columns) new_table.create(bind=connection) return new_table @staticmethod - def to_sql_type(jsonschema_type: dict) -> sqlalchemy.types.TypeEngine: + def to_sql_type(jsonschema_type: dict) -> TypeEngine: """Return a JSON Schema representation of the provided type. By default will call `typing.to_sql_type()`. @@ -280,10 +265,10 @@ def pick_individual_type(jsonschema_type: dict): if "array" in jsonschema_type["type"]: return ARRAY(JSONB()) if jsonschema_type.get("format") == "date-time": - return TIMESTAMP() + return sa.TIMESTAMP() individual_type = th.to_sql_type(jsonschema_type) - if isinstance(individual_type, VARCHAR): - return TEXT() + if isinstance(individual_type, sa.VARCHAR): + return sa.TEXT() return individual_type @staticmethod @@ -299,15 +284,15 @@ def pick_best_sql_type(sql_type_array: list): precedence_order = [ ARRAY, JSONB, - TEXT, - TIMESTAMP, - DATETIME, - DATE, - TIME, - DECIMAL, + sa.TEXT, + sa.TIMESTAMP, + sa.DATETIME, + sa.DATE, + sa.TIME, + sa.DECIMAL, BIGINT, - INTEGER, - BOOLEAN, + sa.INTEGER, + sa.BOOLEAN, NOTYPE, ] @@ -315,22 +300,22 @@ def pick_best_sql_type(sql_type_array: list): for obj in sql_type_array: if isinstance(obj, sql_type): return obj - return TEXT() + return sa.TEXT() def create_empty_table( # type: ignore[override] self, table_name: str, - meta: sqlalchemy.MetaData, + meta: sa.MetaData, schema: dict, - connection: sqlalchemy.engine.Connection, + connection: sa.engine.Connection, primary_keys: list[str] | None = None, partition_keys: list[str] | None = None, as_temp_table: bool = False, - ) -> sqlalchemy.Table: + ) -> sa.Table: """Create an empty target table. Args: - full_table_name: the target table name. + table_name: the target table name. schema: the JSON schema for the new table. primary_keys: list of key properties. partition_keys: list of partition keys. @@ -340,7 +325,7 @@ def create_empty_table( # type: ignore[override] NotImplementedError: if temp tables are unsupported and as_temp_table=True. RuntimeError: if a variant schema is passed with no properties defined. """ - columns: list[sqlalchemy.Column] = [] + columns: list[sa.Column] = [] primary_keys = primary_keys or [] try: properties: dict = schema["properties"] @@ -353,7 +338,7 @@ def create_empty_table( # type: ignore[override] for property_name, property_jsonschema in properties.items(): is_primary_key = property_name in primary_keys columns.append( - sqlalchemy.Column( + sa.Column( property_name, self.to_sql_type(property_jsonschema), primary_key=is_primary_key, @@ -361,24 +346,22 @@ def create_empty_table( # type: ignore[override] ) ) if as_temp_table: - new_table = sqlalchemy.Table( - table_name, meta, *columns, prefixes=["TEMPORARY"] - ) + new_table = sa.Table(table_name, meta, *columns, prefixes=["TEMPORARY"]) new_table.create(bind=connection) return new_table - new_table = sqlalchemy.Table(table_name, meta, *columns) + new_table = sa.Table(table_name, meta, *columns) new_table.create(bind=connection) return new_table def prepare_column( # type: ignore[override] self, schema_name: str, - table: sqlalchemy.Table, + table: sa.Table, column_name: str, - sql_type: sqlalchemy.types.TypeEngine, - connection: sqlalchemy.engine.Connection, - column_object: sqlalchemy.Column | None = None, + sql_type: TypeEngine, + connection: sa.engine.Connection, + column_object: sa.Column | None = None, ) -> None: """Adapt target table to provided schema if possible. @@ -395,7 +378,7 @@ def prepare_column( # type: ignore[override] if not column_exists: self._create_empty_column( - # We should migrate every function to use sqlalchemy.Table + # We should migrate every function to use sa.Table # instead of having to know what the function wants table_name=table.name, column_name=column_name, @@ -419,13 +402,13 @@ def _create_empty_column( # type: ignore[override] schema_name: str, table_name: str, column_name: str, - sql_type: sqlalchemy.types.TypeEngine, - connection: sqlalchemy.engine.Connection, + sql_type: TypeEngine, + connection: sa.engine.Connection, ) -> None: """Create a new column. Args: - full_table_name: The target table name. + table_name: The target table name. column_name: The name of the new column. sql_type: SQLAlchemy type engine to be used in creating the new column. @@ -449,8 +432,8 @@ def get_column_add_ddl( # type: ignore[override] table_name: str, schema_name: str, column_name: str, - column_type: sqlalchemy.types.TypeEngine, - ) -> sqlalchemy.DDL: + column_type: TypeEngine, + ) -> sa.DDL: """Get the create column DDL statement. Args: @@ -462,9 +445,9 @@ def get_column_add_ddl( # type: ignore[override] Returns: A sqlalchemy DDL instance. """ - column = sqlalchemy.Column(column_name, column_type) + column = sa.Column(column_name, column_type) - return sqlalchemy.DDL( + return sa.DDL( ( 'ALTER TABLE "%(schema_name)s"."%(table_name)s"' "ADD COLUMN %(column_name)s %(column_type)s" @@ -482,23 +465,23 @@ def _adapt_column_type( # type: ignore[override] schema_name: str, table_name: str, column_name: str, - sql_type: sqlalchemy.types.TypeEngine, - connection: sqlalchemy.engine.Connection, - column_object: sqlalchemy.Column | None, + sql_type: TypeEngine, + connection: sa.engine.Connection, + column_object: sa.Column | None, ) -> None: """Adapt table column type to support the new JSON schema type. Args: - full_table_name: The target table name. + table_name: The target table name. column_name: The target column name. sql_type: The new SQLAlchemy type. Raises: NotImplementedError: if altering columns is not supported. """ - current_type: sqlalchemy.types.TypeEngine + current_type: TypeEngine if column_object is not None: - current_type = t.cast(sqlalchemy.types.TypeEngine, column_object.type) + current_type = t.cast(TypeEngine, column_object.type) else: current_type = self._get_column_type( schema_name=schema_name, @@ -549,8 +532,8 @@ def get_column_alter_ddl( # type: ignore[override] schema_name: str, table_name: str, column_name: str, - column_type: sqlalchemy.types.TypeEngine, - ) -> sqlalchemy.DDL: + column_type: TypeEngine, + ) -> sa.DDL: """Get the alter column DDL statement. Override this if your database uses a different syntax for altering columns. @@ -563,8 +546,8 @@ def get_column_alter_ddl( # type: ignore[override] Returns: A sqlalchemy DDL instance. """ - column = sqlalchemy.Column(column_name, column_type) - return sqlalchemy.DDL( + column = sa.Column(column_name, column_type) + return sa.DDL( ( 'ALTER TABLE "%(schema_name)s"."%(table_name)s"' "ALTER COLUMN %(column_name)s %(column_type)s" @@ -587,7 +570,7 @@ def get_sqlalchemy_url(self, config: dict) -> str: return cast(str, config["sqlalchemy_url"]) else: - sqlalchemy_url = URL.create( + sqlalchemy_url = sa.engine.URL.create( drivername=config["dialect+driver"], username=config["user"], password=config["password"], @@ -715,12 +698,12 @@ def _get_column_type( # type: ignore[override] schema_name: str, table_name: str, column_name: str, - connection: sqlalchemy.engine.Connection, - ) -> sqlalchemy.types.TypeEngine: + connection: sa.engine.Connection, + ) -> TypeEngine: """Get the SQL type of the declared column. Args: - full_table_name: The name of the table. + table_name: The name of the table. column_name: The name of the column. Returns: @@ -742,15 +725,15 @@ def _get_column_type( # type: ignore[override] ) raise KeyError(msg) from ex - return t.cast(sqlalchemy.types.TypeEngine, column.type) + return t.cast(TypeEngine, column.type) def get_table_columns( # type: ignore[override] self, schema_name: str, table_name: str, - connection: sqlalchemy.engine.Connection, + connection: sa.engine.Connection, column_names: list[str] | None = None, - ) -> dict[str, sqlalchemy.Column]: + ) -> dict[str, sa.Column]: """Return a list of table columns. Overrode to support schema_name @@ -763,11 +746,11 @@ def get_table_columns( # type: ignore[override] Returns: An ordered list of column objects. """ - inspector = sqlalchemy.inspect(connection) + inspector = sa.inspect(connection) columns = inspector.get_columns(table_name, schema_name) return { - col_meta["name"]: sqlalchemy.Column( + col_meta["name"]: sa.Column( col_meta["name"], col_meta["type"], nullable=col_meta.get("nullable", False), @@ -781,7 +764,7 @@ def column_exists( # type: ignore[override] self, full_table_name: str, column_name: str, - connection: sqlalchemy.engine.Connection, + connection: sa.engine.Connection, ) -> bool: """Determine if the target column already exists. @@ -800,10 +783,10 @@ def column_exists( # type: ignore[override] ) -class NOTYPE(TypeDecorator): +class NOTYPE(sa.TypeDecorator): """Type to use when none is provided in the schema.""" - impl = TEXT + impl = sa.TEXT cache_ok = True def process_bind_param(self, value, dialect): @@ -822,4 +805,4 @@ def python_type(self): def as_generic(self, *args: t.Any, **kwargs: t.Any): """Return the generic type for this column.""" - return TEXT() + return sa.TEXT() diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 5b5f7c99..1b9a7c71 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -3,12 +3,9 @@ import uuid from typing import Any, Dict, Iterable, List, Optional, Union, cast -import sqlalchemy +import sqlalchemy as sa from pendulum import now from singer_sdk.sinks import SQLSink -from sqlalchemy import Column, MetaData, Table, insert, select, update -from sqlalchemy.sql import Executable -from sqlalchemy.sql.expression import bindparam from target_postgres.connector import PostgresConnector @@ -72,10 +69,10 @@ def process_batch(self, context: dict) -> None: Args: context: Stream partition or context dictionary. """ - # Use one connection so we do this all in a single transaction + # Use one connection, so we do this all in a single transaction with self.connector._connect() as connection, connection.begin(): # Check structure of table - table: sqlalchemy.Table = self.connector.prepare_table( + table: sa.Table = self.connector.prepare_table( full_table_name=self.full_table_name, schema=self.schema, primary_keys=self.key_properties, @@ -83,7 +80,7 @@ def process_batch(self, context: dict) -> None: connection=connection, ) # Create a temp table (Creates from the table above) - temp_table: sqlalchemy.Table = self.connector.copy_table_structure( + temp_table: sa.Table = self.connector.copy_table_structure( full_table_name=self.temp_table_name, from_table=table, as_temp_table=True, @@ -119,11 +116,11 @@ def generate_temp_table_name(self): def bulk_insert_records( # type: ignore[override] self, - table: sqlalchemy.Table, + table: sa.Table, schema: dict, records: Iterable[Dict[str, Any]], primary_keys: List[str], - connection: sqlalchemy.engine.Connection, + connection: sa.engine.Connection, ) -> Optional[int]: """Bulk insert records to an existing destination table. @@ -174,11 +171,11 @@ def bulk_insert_records( # type: ignore[override] def upsert( self, - from_table: sqlalchemy.Table, - to_table: sqlalchemy.Table, + from_table: sa.Table, + to_table: sa.Table, schema: dict, join_keys: List[str], - connection: sqlalchemy.engine.Connection, + connection: sa.engine.Connection, ) -> Optional[int]: """Merge upsert data from one table to another. @@ -196,33 +193,33 @@ def upsert( """ if self.append_only is True: # Insert - select_stmt = select(from_table.columns).select_from(from_table) + select_stmt = sa.select(from_table.columns).select_from(from_table) insert_stmt = to_table.insert().from_select( names=from_table.columns, select=select_stmt ) connection.execute(insert_stmt) else: join_predicates = [] - to_table_key: sqlalchemy.Column + to_table_key: sa.Column for key in join_keys: - from_table_key: sqlalchemy.Column = from_table.columns[key] + from_table_key: sa.Column = from_table.columns[key] to_table_key = to_table.columns[key] join_predicates.append(from_table_key == to_table_key) - join_condition = sqlalchemy.and_(*join_predicates) + join_condition = sa.and_(*join_predicates) where_predicates = [] for key in join_keys: to_table_key = to_table.columns[key] where_predicates.append(to_table_key.is_(None)) - where_condition = sqlalchemy.and_(*where_predicates) + where_condition = sa.and_(*where_predicates) select_stmt = ( - select(from_table.columns) + sa.select(from_table.columns) .select_from(from_table.outerjoin(to_table, join_condition)) .where(where_condition) ) - insert_stmt = insert(to_table).from_select( + insert_stmt = sa.insert(to_table).from_select( names=from_table.columns, select=select_stmt ) @@ -232,11 +229,13 @@ def upsert( where_condition = join_condition update_columns = {} for column_name in self.schema["properties"].keys(): - from_table_column: sqlalchemy.Column = from_table.columns[column_name] - to_table_column: sqlalchemy.Column = to_table.columns[column_name] + from_table_column: sa.Column = from_table.columns[column_name] + to_table_column: sa.Column = to_table.columns[column_name] update_columns[to_table_column] = from_table_column - update_stmt = update(to_table).where(where_condition).values(update_columns) + update_stmt = ( + sa.update(to_table).where(where_condition).values(update_columns) + ) connection.execute(update_stmt) return None @@ -244,12 +243,12 @@ def upsert( def column_representation( self, schema: dict, - ) -> List[Column]: + ) -> List[sa.Column]: """Return a sqlalchemy table representation for the current schema.""" - columns: list[Column] = [] + columns: list[sa.Column] = [] for property_name, property_jsonschema in schema["properties"].items(): columns.append( - Column( + sa.Column( property_name, self.connector.to_sql_type(property_jsonschema), ) @@ -259,8 +258,8 @@ def column_representation( def generate_insert_statement( self, full_table_name: str, - columns: List[Column], # type: ignore[override] - ) -> Union[str, Executable]: + columns: List[sa.Column], # type: ignore[override] + ) -> Union[str, sa.sql.Executable]: """Generate an insert statement for the given records. Args: @@ -270,9 +269,9 @@ def generate_insert_statement( Returns: An insert statement. """ - metadata = MetaData() - table = Table(full_table_name, metadata, *columns) - return insert(table) + metadata = sa.MetaData() + table = sa.Table(full_table_name, metadata, *columns) + return sa.insert(table) def conform_name(self, name: str, object_type: Optional[str] = None) -> str: """Conforming names of tables, schemas, column names.""" @@ -343,8 +342,8 @@ def activate_version(self, new_version: int) -> None: connection=connection, ) - metadata = MetaData() - target_table = Table( + metadata = sa.MetaData() + target_table = sa.Table( self.table_name, metadata, autoload_with=connection.engine, @@ -353,8 +352,8 @@ def activate_version(self, new_version: int) -> None: self.logger.info("Hard delete: %s", self.config.get("hard_delete")) if self.config["hard_delete"] is True: - delete_stmt = sqlalchemy.delete(target_table).where( - sqlalchemy.or_( + delete_stmt = sa.delete(target_table).where( + sa.or_( target_table.c[self.version_column_name].is_(None), target_table.c[self.version_column_name] <= new_version, ) @@ -375,19 +374,19 @@ def activate_version(self, new_version: int) -> None: ) # Need to deal with the case where data doesn't exist for the version column update_stmt = ( - update(target_table) + sa.update(target_table) .values( { - target_table.c[self.soft_delete_column_name]: bindparam( + target_table.c[self.soft_delete_column_name]: sa.bindparam( "deletedate" ) } ) .where( - sqlalchemy.and_( - sqlalchemy.or_( + sa.and_( + sa.or_( target_table.c[self.version_column_name] - < bindparam("version"), + < sa.bindparam("version"), target_table.c[self.version_column_name].is_(None), ), target_table.c[self.soft_delete_column_name].is_(None), diff --git a/target_postgres/tests/core.py b/target_postgres/tests/core.py index ba5662be..dc0ece69 100644 --- a/target_postgres/tests/core.py +++ b/target_postgres/tests/core.py @@ -1,6 +1,6 @@ """ Config and base values for target-postgres testing """ # flake8: noqa -import sqlalchemy +import sqlalchemy as sa from target_postgres.target import TargetPostgres @@ -53,7 +53,7 @@ def postgres_config_ssh_tunnel(): } -def create_engine(target_postgres: TargetPostgres) -> sqlalchemy.engine.Engine: +def create_engine(target_postgres: TargetPostgres) -> sa.engine.Engine: return TargetPostgres.default_sink_class.connector_class( config=target_postgres.config )._engine diff --git a/target_postgres/tests/data_files/array_boolean.singer b/target_postgres/tests/data_files/array_boolean.singer new file mode 100644 index 00000000..268a64a0 --- /dev/null +++ b/target_postgres/tests/data_files/array_boolean.singer @@ -0,0 +1,5 @@ +{"type": "SCHEMA", "stream": "array_boolean", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "boolean"}}}}} +{"type": "RECORD", "stream": "array_boolean", "record": {"id": 1, "value": [ true, false ]}} +{"type": "RECORD", "stream": "array_boolean", "record": {"id": 2, "value": [ false ]}} +{"type": "RECORD", "stream": "array_boolean", "record": {"id": 3, "value": [ false, true, true, false ]}} +{"type": "STATE", "value": {"array_boolean": 3}} diff --git a/target_postgres/tests/data_files/array_data.singer b/target_postgres/tests/data_files/array_data.singer deleted file mode 100644 index 0d132ac6..00000000 --- a/target_postgres/tests/data_files/array_data.singer +++ /dev/null @@ -1,6 +0,0 @@ -{"type": "SCHEMA", "stream": "test_carts", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "fruits": {"type": "array","items": {"type": "string"}}}}} -{"type": "RECORD", "stream": "test_carts", "record": {"id": 1, "fruits": [ "apple", "orange", "pear" ]}} -{"type": "RECORD", "stream": "test_carts", "record": {"id": 2, "fruits": [ "banana", "apple" ]}} -{"type": "RECORD", "stream": "test_carts", "record": {"id": 3, "fruits": [ "pear" ]}} -{"type": "RECORD", "stream": "test_carts", "record": {"id": 4, "fruits": [ "orange", "banana", "apple", "pear" ]}} -{"type": "STATE", "value": {"test_carts": 4}} diff --git a/target_postgres/tests/data_files/array_number.singer b/target_postgres/tests/data_files/array_number.singer new file mode 100644 index 00000000..4eac276e --- /dev/null +++ b/target_postgres/tests/data_files/array_number.singer @@ -0,0 +1,5 @@ +{"type": "SCHEMA", "stream": "array_number", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "number"}}}}} +{"type": "RECORD", "stream": "array_number", "record": {"id": 1, "value": [ 42.42, 84.84, 23 ]}} +{"type": "RECORD", "stream": "array_number", "record": {"id": 2, "value": [ 1.0 ]}} +{"type": "RECORD", "stream": "array_number", "record": {"id": 3, "value": [ 1.11, 2.22, 3, 4, 5.55 ]}} +{"type": "STATE", "value": {"array_number": 3}} diff --git a/target_postgres/tests/data_files/array_string.singer b/target_postgres/tests/data_files/array_string.singer new file mode 100644 index 00000000..f14e7870 --- /dev/null +++ b/target_postgres/tests/data_files/array_string.singer @@ -0,0 +1,6 @@ +{"type": "SCHEMA", "stream": "array_string", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array","items": {"type": "string"}}}}} +{"type": "RECORD", "stream": "array_string", "record": {"id": 1, "value": [ "apple", "orange", "pear" ]}} +{"type": "RECORD", "stream": "array_string", "record": {"id": 2, "value": [ "banana", "apple" ]}} +{"type": "RECORD", "stream": "array_string", "record": {"id": 3, "value": [ "pear" ]}} +{"type": "RECORD", "stream": "array_string", "record": {"id": 4, "value": [ "orange", "banana", "apple", "pear" ]}} +{"type": "STATE", "value": {"array_string": 4}} diff --git a/target_postgres/tests/data_files/array_timestamp.singer b/target_postgres/tests/data_files/array_timestamp.singer new file mode 100644 index 00000000..e5192cec --- /dev/null +++ b/target_postgres/tests/data_files/array_timestamp.singer @@ -0,0 +1,5 @@ +{"type": "SCHEMA", "stream": "array_timestamp", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "string", "format": "date-time"}}}}} +{"type": "RECORD", "stream": "array_timestamp", "record": {"id": 1, "value": [ "2023-12-13T01:15:02", "2023-12-13T01:16:02" ]}} +{"type": "RECORD", "stream": "array_timestamp", "record": {"id": 2, "value": [ "2023-12-13T01:15:02" ]}} +{"type": "RECORD", "stream": "array_timestamp", "record": {"id": 3, "value": [ "2023-12-13T01:15:02", "2023-12-13T01:16:02", "2023-12-13T01:17:02" ]}} +{"type": "STATE", "value": {"array_timestamp": 3}} diff --git a/target_postgres/tests/data_files/object_mixed.singer b/target_postgres/tests/data_files/object_mixed.singer new file mode 100644 index 00000000..2ed86261 --- /dev/null +++ b/target_postgres/tests/data_files/object_mixed.singer @@ -0,0 +1,3 @@ +{"type": "SCHEMA", "stream": "object_mixed", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "object"}}}} +{"type": "RECORD", "stream": "object_mixed", "record": {"id": 1, "value": {"string": "foo", "integer": 42, "float": 42.42, "timestamp": "2023-12-13T01:15:02", "array_boolean": [true, false], "array_float": [42.42, 84.84], "array_integer": [42, 84], "array_string": ["foo", "bar"], "nested_object": {"foo": "bar"}}}} +{"type": "STATE", "value": {"object_mixed": 1}} diff --git a/target_postgres/tests/test_sdk.py b/target_postgres/tests/test_sdk.py index 3f95c393..5d4207ad 100644 --- a/target_postgres/tests/test_sdk.py +++ b/target_postgres/tests/test_sdk.py @@ -61,7 +61,9 @@ class BasePostgresSDKTests: @pytest.fixture() def connection(self, runner): engine = create_engine(runner) - return engine.connect() + with engine.connect() as connection: + yield connection + engine.dispose() SDKTests = get_target_test_class( diff --git a/target_postgres/tests/test_target_postgres.py b/target_postgres/tests/test_target_postgres.py index 1eaa9978..9b04a738 100644 --- a/target_postgres/tests/test_target_postgres.py +++ b/target_postgres/tests/test_target_postgres.py @@ -8,11 +8,10 @@ import jsonschema import pytest -import sqlalchemy +import sqlalchemy as sa from singer_sdk.exceptions import MissingKeyPropertiesError from singer_sdk.testing import get_target_test_class, sync_end_to_end -from sqlalchemy.dialects.postgresql import ARRAY -from sqlalchemy.types import TEXT, TIMESTAMP +from sqlalchemy.dialects.postgresql import ARRAY, JSONB from target_postgres.connector import PostgresConnector from target_postgres.target import TargetPostgres @@ -28,6 +27,8 @@ postgres_config_ssh_tunnel, ) +METADATA_COLUMN_PREFIX = "_sdc" + # The below syntax is documented at https://docs.pytest.org/en/stable/deprecations.html#calling-fixtures-directly @pytest.fixture(scope="session", name="postgres_config") @@ -75,63 +76,112 @@ def singer_file_to_target(file_name, target) -> None: # TODO should set schemas for each tap individually so we don't collide -def remove_metadata_columns(row: dict) -> dict: - new_row = {} - for column in row.keys(): - if not column.startswith("_sdc"): - new_row[column] = row[column] - return new_row - - -def verify_data( - target: TargetPostgres, - table_name: str, - number_of_rows: int = 1, - primary_key: str | None = None, - check_data: dict | list[dict] | None = None, -): - """Checks whether the data in a table matches a provided data sample. - - Args: - target: The target to obtain a database connection from. - full_table_name: The schema and table name of the table to check data for. - primary_key: The primary key of the table. - number_of_rows: The expected number of rows that should be in the table. - check_data: A dictionary representing the full contents of the first row in the - table, as determined by lowest primary_key value, or else a list of - dictionaries representing every row in the table. - """ - engine = create_engine(target) - full_table_name = f"{target.config['default_target_schema']}.{table_name}" - with engine.connect() as connection: - if primary_key is not None and check_data is not None: - if isinstance(check_data, dict): - result = connection.execute( - sqlalchemy.text( - f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" +class AssertionHelper: + def __init__(self, target: TargetPostgres, metadata_column_prefix: str): + self.target = target + self.metadata_column_prefix = metadata_column_prefix + self.engine = create_engine(self.target) + + def remove_metadata_columns(self, row: dict) -> dict: + new_row = {} + for column in row.keys(): + if not column.startswith(self.metadata_column_prefix): + new_row[column] = row[column] + return new_row + + def verify_data( + self, + table_name: str, + number_of_rows: int = 1, + primary_key: str | None = None, + check_data: dict | list[dict] | None = None, + ): + """Checks whether the data in a table matches a provided data sample. + + Args: + target: The target to obtain a database connection from. + table_name: The schema and table name of the table to check data for. + primary_key: The primary key of the table. + number_of_rows: The expected number of rows that should be in the table. + check_data: A dictionary representing the full contents of the first row in the + table, as determined by lowest primary_key value, or else a list of + dictionaries representing every row in the table. + """ + full_table_name = f"{self.target.config['default_target_schema']}.{table_name}" + with self.engine.connect() as connection: + if primary_key is not None and check_data is not None: + if isinstance(check_data, dict): + result = connection.execute( + sa.text( + f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" + ) ) - ) - assert result.rowcount == number_of_rows - result_dict = remove_metadata_columns(result.first()._asdict()) - assert result_dict == check_data - elif isinstance(check_data, list): - result = connection.execute( - sqlalchemy.text( - f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" + assert result.rowcount == number_of_rows + result_dict = self.remove_metadata_columns(result.first()._asdict()) + assert result_dict == check_data + elif isinstance(check_data, list): + result = connection.execute( + sa.text( + f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" + ) ) - ) - assert result.rowcount == number_of_rows - result_dict = [ - remove_metadata_columns(row._asdict()) for row in result.all() - ] - assert result_dict == check_data + assert result.rowcount == number_of_rows + result_dict = [ + self.remove_metadata_columns(row._asdict()) + for row in result.all() + ] + assert result_dict == check_data + else: + raise ValueError("Invalid check_data - not dict or list of dicts") else: - raise ValueError("Invalid check_data - not dict or list of dicts") - else: - result = connection.execute( - sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}") - ) - assert result.first()[0] == number_of_rows + result = connection.execute( + sa.text(f"SELECT COUNT(*) FROM {full_table_name}") + ) + assert result.first()[0] == number_of_rows + + def verify_schema( + self, + table_name: str, + check_columns: dict = None, + ): + """Checks whether the schema of a database table matches the provided column definitions. + + Args: + target: The target to obtain a database connection from. + table_name: The schema and table name of the table to check data for. + check_columns: A dictionary mapping column names to their definitions. Currently, + it is all about the `type` attribute which is compared. + metadata_column_prefix: The prefix string for metadata columns. Usually `_sdc`. + """ + schema = self.target.config["default_target_schema"] + with self.engine.connect() as connection: + meta = sa.MetaData() + table = sa.Table(table_name, meta, schema=schema, autoload_with=connection) + for column in table.c: + # Ignore `_sdc` metadata columns when veriying table schema. + if column.name.startswith(self.metadata_column_prefix): + continue + try: + column_type_expected = check_columns[column.name]["type"] + except KeyError: + raise ValueError( + f"Invalid check_columns - missing definition for column: {column.name}" + ) + if not isinstance(column.type, column_type_expected): + raise TypeError( + f"Column '{column.name}' (with type '{column.type}') " + f"does not match expected type: {column_type_expected}" + ) + + def __del__(self): + self.engine.dispose() + + +@pytest.fixture +def helper(postgres_target) -> AssertionHelper: + return AssertionHelper( + target=postgres_target, metadata_column_prefix=METADATA_COLUMN_PREFIX + ) def test_sqlalchemy_url_config(postgres_config_no_ssl): @@ -171,7 +221,7 @@ def test_port_default_config(): target_config = TargetPostgres(config=config).config connector = PostgresConnector(target_config) - engine: sqlalchemy.engine.Engine = connector._engine + engine: sa.engine.Engine = connector._engine assert ( engine.url.render_as_string(hide_password=False) == f"{dialect_driver}://{user}:{password}@{host}:5432/{database}" @@ -196,7 +246,7 @@ def test_port_config(): target_config = TargetPostgres(config=config).config connector = PostgresConnector(target_config) - engine: sqlalchemy.engine.Engine = connector._engine + engine: sa.engine.Engine = connector._engine assert ( engine.url.render_as_string(hide_password=False) == f"{dialect_driver}://{user}:{password}@{host}:5433/{database}" @@ -248,11 +298,11 @@ def test_special_chars_in_attributes(postgres_target): singer_file_to_target(file_name, postgres_target) -def test_optional_attributes(postgres_target): +def test_optional_attributes(postgres_target, helper): file_name = "optional_attributes.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "optional": "This is optional"} - verify_data(postgres_target, "test_optional_attributes", 4, "id", row) + helper.verify_data("test_optional_attributes", 4, "id", row) def test_schema_no_properties(postgres_target): @@ -272,7 +322,7 @@ def test_large_numeric_primary_key(postgres_target): # TODO test that data is correct -def test_schema_updates(postgres_target): +def test_schema_updates(postgres_target, helper): file_name = "schema_updates.singer" singer_file_to_target(file_name, postgres_target) row = { @@ -284,16 +334,16 @@ def test_schema_updates(postgres_target): "a5": None, "a6": None, } - verify_data(postgres_target, "test_schema_updates", 6, "id", row) + helper.verify_data("test_schema_updates", 6, "id", row) -def test_multiple_state_messages(postgres_target): +def test_multiple_state_messages(postgres_target, helper): file_name = "multiple_state_messages.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "metric": 100} - verify_data(postgres_target, "test_multiple_state_messages_a", 6, "id", row) + helper.verify_data("test_multiple_state_messages_a", 6, "id", row) row = {"id": 1, "metric": 110} - verify_data(postgres_target, "test_multiple_state_messages_b", 6, "id", row) + helper.verify_data("test_multiple_state_messages_b", 6, "id", row) # TODO test that data is correct @@ -310,7 +360,7 @@ def test_multiple_schema_messages(postgres_target, caplog): assert "Schema has changed for stream" not in caplog.text -def test_relational_data(postgres_target): +def test_relational_data(postgres_target, helper): file_name = "user_location_data.singer" singer_file_to_target(file_name, postgres_target) @@ -367,18 +417,18 @@ def test_relational_data(postgres_target): }, ] - verify_data(postgres_target, "test_users", 8, "id", users) - verify_data(postgres_target, "test_locations", 5, "id", locations) - verify_data(postgres_target, "test_user_in_location", 5, "id", user_in_location) + helper.verify_data("test_users", 8, "id", users) + helper.verify_data("test_locations", 5, "id", locations) + helper.verify_data("test_user_in_location", 5, "id", user_in_location) -def test_no_primary_keys(postgres_target): +def test_no_primary_keys(postgres_target, helper): """We run both of these tests twice just to ensure that no records are removed and append only works properly""" engine = create_engine(postgres_target) table_name = "test_no_pk" full_table_name = postgres_target.config["default_target_schema"] + "." + table_name with engine.connect() as connection, connection.begin(): - connection.execute(sqlalchemy.text(f"DROP TABLE IF EXISTS {full_table_name}")) + connection.execute(sa.text(f"DROP TABLE IF EXISTS {full_table_name}")) file_name = f"{table_name}.singer" singer_file_to_target(file_name, postgres_target) @@ -391,7 +441,7 @@ def test_no_primary_keys(postgres_target): file_name = f"{table_name}_append.singer" singer_file_to_target(file_name, postgres_target) - verify_data(postgres_target, table_name, 16) + helper.verify_data(table_name, 16) def test_no_type(postgres_target): @@ -399,21 +449,97 @@ def test_no_type(postgres_target): singer_file_to_target(file_name, postgres_target) -def test_duplicate_records(postgres_target): +def test_duplicate_records(postgres_target, helper): file_name = "duplicate_records.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "metric": 100} - verify_data(postgres_target, "test_duplicate_records", 2, "id", row) + helper.verify_data("test_duplicate_records", 2, "id", row) -def test_array_data(postgres_target): - file_name = "array_data.singer" +def test_array_boolean(postgres_target, helper): + file_name = "array_boolean.singer" singer_file_to_target(file_name, postgres_target) - row = {"id": 1, "fruits": ["apple", "orange", "pear"]} - verify_data(postgres_target, "test_carts", 4, "id", row) + row = {"id": 1, "value": [True, False]} + helper.verify_data("array_boolean", 3, "id", row) + helper.verify_schema( + "array_boolean", + check_columns={ + "id": {"type": sa.BIGINT}, + "value": {"type": ARRAY}, + }, + ) -def test_encoded_string_data(postgres_target): +def test_array_number(postgres_target, helper): + file_name = "array_number.singer" + singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "value": [Decimal("42.42"), Decimal("84.84"), 23]} + helper.verify_data("array_number", 3, "id", row) + helper.verify_schema( + "array_number", + check_columns={ + "id": {"type": sa.BIGINT}, + "value": {"type": ARRAY}, + }, + ) + + +def test_array_string(postgres_target, helper): + file_name = "array_string.singer" + singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "value": ["apple", "orange", "pear"]} + helper.verify_data("array_string", 4, "id", row) + helper.verify_schema( + "array_string", + check_columns={ + "id": {"type": sa.BIGINT}, + "value": {"type": ARRAY}, + }, + ) + + +def test_array_timestamp(postgres_target, helper): + file_name = "array_timestamp.singer" + singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "value": ["2023-12-13T01:15:02", "2023-12-13T01:16:02"]} + helper.verify_data("array_timestamp", 3, "id", row) + helper.verify_schema( + "array_timestamp", + check_columns={ + "id": {"type": sa.BIGINT}, + "value": {"type": ARRAY}, + }, + ) + + +def test_object_mixed(postgres_target, helper): + file_name = "object_mixed.singer" + singer_file_to_target(file_name, postgres_target) + row = { + "id": 1, + "value": { + "string": "foo", + "integer": 42, + "float": Decimal("42.42"), + "timestamp": "2023-12-13T01:15:02", + "array_boolean": [True, False], + "array_float": [Decimal("42.42"), Decimal("84.84")], + "array_integer": [42, 84], + "array_string": ["foo", "bar"], + "nested_object": {"foo": "bar"}, + }, + } + helper.verify_data("object_mixed", 1, "id", row) + helper.verify_schema( + "object_mixed", + check_columns={ + "id": {"type": sa.BIGINT}, + "value": {"type": JSONB}, + }, + ) + + +def test_encoded_string_data(postgres_target, helper): """ We removed NUL characters from the original encoded_strings.singer as postgres doesn't allow them. https://www.postgresql.org/docs/current/functions-string.html#:~:text=chr(0)%20is%20disallowed%20because%20text%20data%20types%20cannot%20store%20that%20character. @@ -426,11 +552,11 @@ def test_encoded_string_data(postgres_target): file_name = "encoded_strings.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "info": "simple string 2837"} - verify_data(postgres_target, "test_strings", 11, "id", row) + helper.verify_data("test_strings", 11, "id", row) row = {"id": 1, "info": {"name": "simple", "value": "simple string 2837"}} - verify_data(postgres_target, "test_strings_in_objects", 11, "id", row) + helper.verify_data("test_strings_in_objects", 11, "id", row) row = {"id": 1, "strings": ["simple string", "απλή συμβολοσειρά", "简单的字串"]} - verify_data(postgres_target, "test_strings_in_arrays", 6, "id", row) + helper.verify_data("test_strings_in_arrays", 6, "id", row) def test_tap_appl(postgres_target): @@ -454,43 +580,33 @@ def test_large_int(postgres_target): singer_file_to_target(file_name, postgres_target) -def test_anyof(postgres_target): +def test_anyof(postgres_target, helper): """Test that anyOf is handled correctly""" - engine = create_engine(postgres_target) table_name = "commits" file_name = f"{table_name}.singer" - schema = postgres_target.config["default_target_schema"] singer_file_to_target(file_name, postgres_target) - with engine.connect() as connection: - meta = sqlalchemy.MetaData() - table = sqlalchemy.Table( - "commits", meta, schema=schema, autoload_with=connection - ) - for column in table.c: - # {"type":"string"} - if column.name == "id": - assert isinstance(column.type, TEXT) + helper.verify_schema( + table_name, + check_columns={ + # {"type":"string"} + "id": {"type": sa.TEXT}, # Any of nullable date-time. # Note that postgres timestamp is equivalent to jsonschema date-time. # {"anyOf":[{"type":"string","format":"date-time"},{"type":"null"}]} - if column.name in {"authored_date", "committed_date"}: - assert isinstance(column.type, TIMESTAMP) - + "authored_date": {"type": sa.TIMESTAMP}, + "committed_date": {"type": sa.TIMESTAMP}, # Any of nullable array of strings or single string. # {"anyOf":[{"type":"array","items":{"type":["null","string"]}},{"type":"string"},{"type":"null"}]} - if column.name == "parent_ids": - assert isinstance(column.type, ARRAY) - + "parent_ids": {"type": ARRAY}, # Any of nullable string. # {"anyOf":[{"type":"string"},{"type":"null"}]} - if column.name == "commit_message": - assert isinstance(column.type, TEXT) - + "commit_message": {"type": sa.TEXT}, # Any of nullable string or integer. # {"anyOf":[{"type":"string"},{"type":"integer"},{"type":"null"}]} - if column.name == "legacy_id": - assert isinstance(column.type, TEXT) + "legacy_id": {"type": sa.TEXT}, + }, + ) def test_new_array_column(postgres_target): @@ -510,29 +626,29 @@ def test_activate_version_hard_delete(postgres_config_no_ssl): engine = create_engine(pg_hard_delete_true) singer_file_to_target(file_name, pg_hard_delete_true) with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 7 with engine.connect() as connection, connection.begin(): # Add a record like someone would if they weren't using the tap target combo result = connection.execute( - sqlalchemy.text( + sa.text( f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual1', 'Meltano')" ) ) result = connection.execute( - sqlalchemy.text( + sa.text( f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual2', 'Meltano')" ) ) with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 9 singer_file_to_target(file_name, pg_hard_delete_true) # Should remove the 2 records we added manually with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 7 @@ -543,43 +659,41 @@ def test_activate_version_soft_delete(postgres_target): file_name = f"{table_name}.singer" full_table_name = postgres_target.config["default_target_schema"] + "." + table_name with engine.connect() as connection, connection.begin(): - result = connection.execute( - sqlalchemy.text(f"DROP TABLE IF EXISTS {full_table_name}") - ) + result = connection.execute(sa.text(f"DROP TABLE IF EXISTS {full_table_name}")) postgres_config_soft_delete = copy.deepcopy(postgres_target._config) postgres_config_soft_delete["hard_delete"] = False pg_soft_delete = TargetPostgres(config=postgres_config_soft_delete) singer_file_to_target(file_name, pg_soft_delete) with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 7 with engine.connect() as connection, connection.begin(): # Add a record like someone would if they weren't using the tap target combo result = connection.execute( - sqlalchemy.text( + sa.text( f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual1', 'Meltano')" ) ) result = connection.execute( - sqlalchemy.text( + sa.text( f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual2', 'Meltano')" ) ) with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 9 singer_file_to_target(file_name, pg_soft_delete) # Should have all records including the 2 we added manually with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 9 result = connection.execute( - sqlalchemy.text( - f"SELECT * FROM {full_table_name} where _sdc_deleted_at is NOT NULL" + sa.text( + f"SELECT * FROM {full_table_name} where {METADATA_COLUMN_PREFIX}_deleted_at is NOT NULL" ) ) assert result.rowcount == 2 @@ -592,9 +706,7 @@ def test_activate_version_deletes_data_properly(postgres_target): file_name = f"{table_name}.singer" full_table_name = postgres_target.config["default_target_schema"] + "." + table_name with engine.connect() as connection, connection.begin(): - result = connection.execute( - sqlalchemy.text(f"DROP TABLE IF EXISTS {full_table_name}") - ) + result = connection.execute(sa.text(f"DROP TABLE IF EXISTS {full_table_name}")) postgres_config_soft_delete = copy.deepcopy(postgres_target._config) postgres_config_soft_delete["hard_delete"] = True @@ -602,27 +714,27 @@ def test_activate_version_deletes_data_properly(postgres_target): singer_file_to_target(file_name, pg_hard_delete) # Will populate us with 7 records with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 7 with engine.connect() as connection, connection.begin(): result = connection.execute( - sqlalchemy.text( + sa.text( f"INSERT INTO {full_table_name} (code, \"name\") VALUES('Manual1', 'Meltano')" ) ) result = connection.execute( - sqlalchemy.text( + sa.text( f"INSERT INTO {full_table_name} (code, \"name\") VALUES('Manual2', 'Meltano')" ) ) with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 9 # Only has a schema and one activate_version message, should delete all records as it's a higher version than what's currently in the table file_name = f"{table_name}_2.singer" singer_file_to_target(file_name, pg_hard_delete) with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 0 @@ -663,7 +775,7 @@ def test_postgres_ssl_no_config(postgres_config_no_ssl): postgres_config_modified = copy.deepcopy(postgres_config_no_ssl) postgres_config_modified["port"] = 5432 - with pytest.raises(sqlalchemy.exc.OperationalError): + with pytest.raises(sa.exc.OperationalError): target = TargetPostgres(config=postgres_config_modified) sync_end_to_end(tap, target) @@ -689,8 +801,8 @@ def test_postgres_ssl_public_pkey(postgres_config): postgres_config_modified["ssl_client_private_key"] = "./ssl/public_pkey.key" # If the private key exists but access is too public, the target won't fail until - # the it attempts to establish a connection to the database. - with pytest.raises(sqlalchemy.exc.OperationalError): + # it attempts to establish a connection to the database. + with pytest.raises(sa.exc.OperationalError): target = TargetPostgres(config=postgres_config_modified) sync_end_to_end(tap, target) @@ -719,7 +831,7 @@ def test_postgres_ssl_invalid_cn(postgres_config): postgres_config_modified["host"] = "127.0.0.1" postgres_config_modified["ssl_mode"] = "verify-full" - with pytest.raises(sqlalchemy.exc.OperationalError): + with pytest.raises(sa.exc.OperationalError): target = TargetPostgres(config=postgres_config_modified) sync_end_to_end(tap, target) @@ -752,7 +864,7 @@ def test_postgres_ssl_unsupported(postgres_config): postgres_config_modified = copy.deepcopy(postgres_config) postgres_config_modified["port"] = 5433 # Alternate service: postgres_no_ssl - with pytest.raises(sqlalchemy.exc.OperationalError): + with pytest.raises(sa.exc.OperationalError): target = TargetPostgres(config=postgres_config_modified) sync_end_to_end(tap, target)