From a91c94b971e0cd3f3fe76153fe8891dda6e5155e Mon Sep 17 00:00:00 2001 From: King Chung Huang Date: Thu, 13 Jun 2024 17:19:45 -0600 Subject: [PATCH 01/12] Generate a copy statement Create a variation of generate_insert_statement that returns a PostgreSQL copy statement, suitable for bulk loading of data formatted as csv. --- target_postgres/sinks.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index ea8b8df3..eb8bf622 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -259,6 +259,25 @@ def column_representation( ) return columns + def generate_copy_statement( + self, + full_table_name: str, + columns: List[sa.Column], + ) -> str: + """Generate a copy statement for bulk copy. + + Args: + full_table_name: the target table name. + columns: the target table columns. + + Returns: + A copy statement. + """ + columns_list = ", ".join((f'"{column.name}"' for column in columns)) + sql: str = f"copy {full_table_name} ({columns_list}) from stdin with csv" + + return sql + def generate_insert_statement( self, full_table_name: str, From c11a9293c95792d53b258b42f3a34d4a21574370 Mon Sep 17 00:00:00 2001 From: King Chung Huang Date: Thu, 13 Jun 2024 17:33:04 -0600 Subject: [PATCH 02/12] Bulk insert data using copy Use copy instead of insert to bulk insert records. In PostgreSQL, copy is the fastest way to insert bulk data. --- target_postgres/sinks.py | 46 +++++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index eb8bf622..6070566a 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -1,7 +1,9 @@ """Postgres target sink class, which handles writing streams.""" +import csv import uuid -from typing import Any, Dict, Iterable, List, Optional, Sequence, Union, cast +from io import StringIO +from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union, cast import sqlalchemy as sa from pendulum import now @@ -142,35 +144,35 @@ def bulk_insert_records( # type: ignore[override] True if table exists, False if not, None if unsure or undetectable. """ columns = self.column_representation(schema) - insert: str = cast( - str, - self.generate_insert_statement( - table.name, - columns, - ), - ) - self.logger.info("Inserting with SQL: %s", insert) + copy_statement: str = self.generate_copy_statement(table.name, columns) + self.logger.info("Inserting with SQL: %s", copy_statement) # Only one record per PK, we want to take the last one - data_to_insert: List[Dict[str, Any]] = [] + data_to_insert: Tuple[Tuple[Any]] = None if self.append_only is False: - insert_records: Dict[str, Dict] = {} # pk : record + copy_values: Dict[str, Tuple] = {} # pk : values for record in records: - insert_record = {} - for column in columns: - insert_record[column.name] = record.get(column.name) + values = tuple((record.get(column.name) for column in columns)) # No need to check for a KeyError here because the SDK already # guaruntees that all key properties exist in the record. primary_key_value = "".join([str(record[key]) for key in primary_keys]) - insert_records[primary_key_value] = insert_record - data_to_insert = list(insert_records.values()) + copy_values[primary_key_value] = values + data_to_insert = tuple(copy_values.values()) else: - for record in records: - insert_record = {} - for column in columns: - insert_record[column.name] = record.get(column.name) - data_to_insert.append(insert_record) - connection.execute(insert, data_to_insert) + data_to_insert = [ + tuple((record.get(column.name) for column in columns)) + for record in records + ] + + # Prepare a buffer with the values as csv. + buffer = StringIO() + writer = csv.writer(buffer) + writer.writerows(data_to_insert) + buffer.seek(0) + + with connection.connection.cursor() as cur: + cur.copy_expert(sql=copy_statement, file=buffer) + return True def upsert( From 4af0c97574948e94ad5b54ba5bb77551dd9c3a9b Mon Sep 17 00:00:00 2001 From: King Chung Huang Date: Thu, 13 Jun 2024 17:52:25 -0600 Subject: [PATCH 03/12] Remove generate_insert_statement The override of generate_insert_statement is no longer used. --- target_postgres/sinks.py | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 6070566a..34ef61ac 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -280,24 +280,6 @@ def generate_copy_statement( return sql - def generate_insert_statement( - self, - full_table_name: str, - columns: List[sa.Column], # type: ignore[override] - ) -> Union[str, Executable]: - """Generate an insert statement for the given records. - - Args: - full_table_name: the target table name. - columns: the target table columns. - - Returns: - An insert statement. - """ - metadata = sa.MetaData() - table = sa.Table(full_table_name, metadata, *columns) - return sa.insert(table) - def conform_name(self, name: str, object_type: Optional[str] = None) -> str: """Conforming names of tables, schemas, column names.""" return name From 943419bdff4ff9961ebd30ba15ee984f96212ed6 Mon Sep 17 00:00:00 2001 From: King Chung Huang Date: Fri, 14 Jun 2024 10:09:15 -0600 Subject: [PATCH 04/12] Directly handle csv generation Use the type bind processors to generate values to ensure that values are represented correctly to PostgreSQL, especially for things like ARRAY and JSONB. Also, handle null values. --- target_postgres/sinks.py | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 34ef61ac..132e498e 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -1,6 +1,5 @@ """Postgres target sink class, which handles writing streams.""" -import csv import uuid from io import StringIO from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union, cast @@ -164,10 +163,27 @@ def bulk_insert_records( # type: ignore[override] for record in records ] - # Prepare a buffer with the values as csv. + # Prepare processor functions for each column type. These are used to convert + # from Python values to database values. + column_processors = [ + column.type.bind_processor(connection.dialect) or str for column in columns + ] + + # Create a buffer of CSV formatted values to send in bulk. buffer = StringIO() - writer = csv.writer(buffer) - writer.writerows(data_to_insert) + for row in data_to_insert: + processed_row = ",".join( + map( + lambda data, proc: ( + "" if data is None else str(proc(data)).replace('"', '""') + ), + row, + column_processors, + ) + ) + + buffer.write(processed_row) + buffer.write("\n") buffer.seek(0) with connection.connection.cursor() as cur: From 734c9b504842f21746235391084f2b2e7a4ebbd9 Mon Sep 17 00:00:00 2001 From: King Chung Huang Date: Fri, 14 Jun 2024 10:10:29 -0600 Subject: [PATCH 05/12] Quote table name --- target_postgres/sinks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 132e498e..cd1e1f40 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -292,7 +292,7 @@ def generate_copy_statement( A copy statement. """ columns_list = ", ".join((f'"{column.name}"' for column in columns)) - sql: str = f"copy {full_table_name} ({columns_list}) from stdin with csv" + sql: str = f'copy "{full_table_name}" ({columns_list}) from stdin with csv' return sql From a97ab70b1c514767a4cefeaf2eda9a25adaa19ab Mon Sep 17 00:00:00 2001 From: King Chung Huang Date: Fri, 14 Jun 2024 11:19:42 -0600 Subject: [PATCH 06/12] Add a comment about copy_expert --- target_postgres/sinks.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index cd1e1f40..694d7899 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -186,6 +186,8 @@ def bulk_insert_records( # type: ignore[override] buffer.write("\n") buffer.seek(0) + # Use copy_expert to run the copy statement. + # https://www.psycopg.org/docs/cursor.html#cursor.copy_expert with connection.connection.cursor() as cur: cur.copy_expert(sql=copy_statement, file=buffer) From 72f2b375577004670cafda17f3fcff2323ad7b76 Mon Sep 17 00:00:00 2001 From: King Chung Huang Date: Fri, 14 Jun 2024 14:50:20 -0600 Subject: [PATCH 07/12] Improve csv generation Always quote strings and handle array values. --- target_postgres/sinks.py | 56 ++++++++++++++++++++++++++++++---------- 1 file changed, 43 insertions(+), 13 deletions(-) diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 694d7899..42f61b43 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -2,7 +2,18 @@ import uuid from io import StringIO -from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union, cast +from typing import ( + Any, + Callable, + Dict, + Iterable, + List, + Optional, + Sequence, + Tuple, + Union, + cast, +) import sqlalchemy as sa from pendulum import now @@ -163,24 +174,43 @@ def bulk_insert_records( # type: ignore[override] for record in records ] - # Prepare processor functions for each column type. These are used to convert - # from Python values to database values. + # Prepare to process the rows into csv. Use each column's bind_processor to do + # most of the work, then do the final construction of the csv rows ourselves + # to control exactly how values are converted and which ones are quoted. column_processors = [ column.type.bind_processor(connection.dialect) or str for column in columns ] - # Create a buffer of CSV formatted values to send in bulk. + def process_column_value(data: Any, proc: Callable) -> str: + # If the data is null, return nothing (unquoted). + if data is None: + return "" + + # Pass the Python value through the bind_processor. + value = proc(data) + + # If the value is a string, escape double-quotes as "" and return + # a quoted value. + if isinstance(value, str): + # escape double quotes as "". + return '"' + value.replace('"', '""') + '"' + + # If the value is a list (for ARRAY), escape double-quotes as \" and return + # a quoted value in literal array format. + if isinstance(value, list): + # for each member of value, escape double quotes as \". + return ( + '"{' + + ",".join('""' + v.replace('"', r'\""') + '""' for v in value) + + '}"' + ) + + # Otherwise, return the string representation of the value. + return str(value) + buffer = StringIO() for row in data_to_insert: - processed_row = ",".join( - map( - lambda data, proc: ( - "" if data is None else str(proc(data)).replace('"', '""') - ), - row, - column_processors, - ) - ) + processed_row = ",".join(map(process_column_value, row, column_processors)) buffer.write(processed_row) buffer.write("\n") From e8b438cdd2d2a8af0bd18dd0220d6c1ae66baddd Mon Sep 17 00:00:00 2001 From: King Chung Huang Date: Fri, 14 Jun 2024 14:52:06 -0600 Subject: [PATCH 08/12] Remove unused imports --- target_postgres/sinks.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 42f61b43..85148566 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -11,14 +11,12 @@ Optional, Sequence, Tuple, - Union, cast, ) import sqlalchemy as sa from pendulum import now from singer_sdk.sinks import SQLSink -from sqlalchemy.sql import Executable from sqlalchemy.sql.expression import bindparam from target_postgres.connector import PostgresConnector From 20cdb554e94e667853d4f975a7edb645bd9b841c Mon Sep 17 00:00:00 2001 From: King Chung Huang Date: Fri, 14 Jun 2024 16:14:33 -0600 Subject: [PATCH 09/12] Improve comment about unquoted null values --- target_postgres/sinks.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 85148566..8e00c209 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -180,7 +180,8 @@ def bulk_insert_records( # type: ignore[override] ] def process_column_value(data: Any, proc: Callable) -> str: - # If the data is null, return nothing (unquoted). + # If the data is null, return an unquoted, empty value. + # Unquoted is important here, for PostgreSQL to interpret as null. if data is None: return "" From 534a38ac88b639347553a75b22e6e28564c9265b Mon Sep 17 00:00:00 2001 From: King Chung Huang Date: Fri, 28 Jun 2024 15:37:19 -0600 Subject: [PATCH 10/12] Escape backslashes in array values --- target_postgres/sinks.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 8e00c209..0f1523bb 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -179,6 +179,14 @@ def bulk_insert_records( # type: ignore[override] column.type.bind_processor(connection.dialect) or str for column in columns ] + # Make translation table for escaping in array values. + array_translate_table = str.maketrans( + { + '"': '\\""', + "\\": "\\\\", + } + ) + def process_column_value(data: Any, proc: Callable) -> str: # If the data is null, return an unquoted, empty value. # Unquoted is important here, for PostgreSQL to interpret as null. @@ -200,7 +208,9 @@ def process_column_value(data: Any, proc: Callable) -> str: # for each member of value, escape double quotes as \". return ( '"{' - + ",".join('""' + v.replace('"', r'\""') + '""' for v in value) + + ",".join( + '""' + v.translate(array_translate_table) + '""' for v in value + ) + '}"' ) From ed4838f6793fa51cd75b29014039943b9bfa153b Mon Sep 17 00:00:00 2001 From: King Chung Huang Date: Fri, 28 Jun 2024 15:48:06 -0600 Subject: [PATCH 11/12] Escape backslashes in string values --- target_postgres/sinks.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 0f1523bb..c59e312d 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -180,6 +180,12 @@ def bulk_insert_records( # type: ignore[override] ] # Make translation table for escaping in array values. + str_translate_table = str.maketrans( + { + '"': '""', + "\\": "\\\\", + } + ) array_translate_table = str.maketrans( { '"': '\\""', @@ -200,7 +206,7 @@ def process_column_value(data: Any, proc: Callable) -> str: # a quoted value. if isinstance(value, str): # escape double quotes as "". - return '"' + value.replace('"', '""') + '"' + return '"' + value.translate(str_translate_table) + '"' # If the value is a list (for ARRAY), escape double-quotes as \" and return # a quoted value in literal array format. From 12a85b20bec364dfd7835b7d7009c5d4d7bad514 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Wed, 14 Aug 2024 07:48:20 -0600 Subject: [PATCH 12/12] Update imports --- target_postgres/sinks.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 9eacb39e..d506abf9 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -6,10 +6,6 @@ import typing as t import uuid from io import StringIO -from typing import ( - Any, - Callable, -) import sqlalchemy as sa from singer_sdk.sinks import SQLSink @@ -154,7 +150,7 @@ def bulk_insert_records( # type: ignore[override] copy_statement: str = self.generate_copy_statement(table.name, columns) self.logger.info("Inserting with SQL: %s", copy_statement) # Only one record per PK, we want to take the last one - data_to_insert: tuple[tuple[Any, ...], ...] + data_to_insert: tuple[tuple[t.Any, ...], ...] if self.append_only is False: copy_values: dict[str, tuple] = {} # pk : values @@ -192,7 +188,7 @@ def bulk_insert_records( # type: ignore[override] } ) - def process_column_value(data: Any, proc: Callable) -> str: + def process_column_value(data: t.Any, proc: t.Callable) -> str: # If the data is null, return an unquoted, empty value. # Unquoted is important here, for PostgreSQL to interpret as null. if data is None: