17
17
import simplejson
18
18
import sqlalchemy as sa
19
19
from singer_sdk import SQLConnector
20
- from singer_sdk import typing as th
20
+ from singer_sdk . connectors . sql import JSONSchemaToSQL
21
21
from sqlalchemy .dialects .postgresql import ARRAY , BIGINT , BYTEA , JSONB , UUID
22
22
from sqlalchemy .engine import URL
23
23
from sqlalchemy .engine .url import make_url
30
30
TEXT ,
31
31
TIME ,
32
32
TIMESTAMP ,
33
- VARCHAR ,
34
33
TypeDecorator ,
35
34
)
36
35
from sshtunnel import SSHTunnelForwarder
39
38
from singer_sdk .connectors .sql import FullyQualifiedName
40
39
41
40
41
+ class JSONSchemaToPostgres (JSONSchemaToSQL ):
42
+ """Convert JSON Schema types to Postgres types."""
43
+
44
+ def __init__ (self , * , content_encoding : bool = True ) -> None :
45
+ """Initialize the JSONSchemaToPostgres instance."""
46
+ super ().__init__ ()
47
+ self .content_encoding = content_encoding
48
+
49
+ def handle_raw_string (self , schema ):
50
+ """Handle a raw string type."""
51
+ if self .content_encoding and schema .get ("contentEncoding" ) == "base16" :
52
+ return HexByteString ()
53
+
54
+ return TEXT ()
55
+
56
+
42
57
class PostgresConnector (SQLConnector ):
43
58
"""Sets up SQL Alchemy, and other Postgres related stuff."""
44
59
@@ -214,7 +229,50 @@ def clone_table(
214
229
new_table .create (bind = connection )
215
230
return new_table
216
231
217
- def to_sql_type (self , jsonschema_type : dict ) -> sa .types .TypeEngine : # type: ignore[override]
232
+ def _handle_array_type (self , jsonschema : dict ) -> ARRAY | JSONB :
233
+ """Handle array type."""
234
+ items = jsonschema .get ("items" )
235
+ # Case 1: items is a string
236
+ if isinstance (items , str ):
237
+ return ARRAY (self .to_sql_type ({"type" : items }))
238
+
239
+ # Case 2: items are more complex
240
+ if isinstance (items , dict ):
241
+ # Case 2.1: items are variants
242
+ if "type" not in items :
243
+ return ARRAY (JSONB ())
244
+
245
+ items_type = items ["type" ]
246
+
247
+ # Case 2.2: items are a single type
248
+ if isinstance (items_type , str ):
249
+ return ARRAY (self .to_sql_type ({"type" : items_type }))
250
+
251
+ # Case 2.3: items are a list of types
252
+ if isinstance (items_type , list ):
253
+ return ARRAY (self .to_sql_type ({"type" : items_type }))
254
+
255
+ # Case 3: tuples
256
+ return ARRAY (JSONB ()) if isinstance (items , list ) else JSONB ()
257
+
258
+ @cached_property
259
+ def jsonschema_to_sql (self ) -> JSONSchemaToSQL :
260
+ """Return a JSONSchemaToSQL instance with custom type handling."""
261
+ to_sql = JSONSchemaToPostgres (content_encoding = self .interpret_content_encoding )
262
+ to_sql .fallback_type = TEXT
263
+ to_sql .register_type_handler ("integer" , BIGINT )
264
+ to_sql .register_type_handler ("object" , JSONB )
265
+ to_sql .register_type_handler ("array" , self ._handle_array_type )
266
+ to_sql .register_format_handler ("date-time" , TIMESTAMP )
267
+ to_sql .register_format_handler ("uuid" , UUID )
268
+ to_sql .register_format_handler ("email" , TEXT )
269
+ to_sql .register_format_handler ("uri" , TEXT )
270
+ to_sql .register_format_handler ("hostname" , TEXT )
271
+ to_sql .register_format_handler ("ipv4" , TEXT )
272
+ to_sql .register_format_handler ("ipv6" , TEXT )
273
+ return to_sql
274
+
275
+ def to_sql_type (self , jsonschema_type : dict ) -> sa .types .TypeEngine :
218
276
"""Return a JSON Schema representation of the provided type.
219
277
220
278
By default will call `typing.to_sql_type()`.
@@ -270,7 +328,7 @@ def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine: # type: ig
270
328
271
329
return PostgresConnector .pick_best_sql_type (sql_type_array = sql_type_array )
272
330
273
- def pick_individual_type (self , jsonschema_type : dict ): # noqa: PLR0911
331
+ def pick_individual_type (self , jsonschema_type : dict ):
274
332
"""Select the correct sql type assuming jsonschema_type has only a single type.
275
333
276
334
Args:
@@ -281,47 +339,8 @@ def pick_individual_type(self, jsonschema_type: dict): # noqa: PLR0911
281
339
"""
282
340
if "null" in jsonschema_type ["type" ]:
283
341
return None
284
- if "integer" in jsonschema_type ["type" ]:
285
- return BIGINT ()
286
- if "object" in jsonschema_type ["type" ]:
287
- return JSONB ()
288
- if "array" in jsonschema_type ["type" ]:
289
- items = jsonschema_type .get ("items" )
290
- # Case 1: items is a string
291
- if isinstance (items , str ):
292
- return ARRAY (self .to_sql_type ({"type" : items }))
293
-
294
- # Case 2: items are more complex
295
- if isinstance (items , dict ):
296
- # Case 2.1: items are variants
297
- if "type" not in items :
298
- return ARRAY (JSONB ())
299
-
300
- items_type = items ["type" ]
301
-
302
- # Case 2.2: items are a single type
303
- if isinstance (items_type , str ):
304
- return ARRAY (self .to_sql_type ({"type" : items_type }))
305
-
306
- # Case 2.3: items are a list of types
307
- if isinstance (items_type , list ):
308
- return ARRAY (self .to_sql_type ({"type" : items_type }))
309
-
310
- # Case 3: tuples
311
- return ARRAY (JSONB ()) if isinstance (items , list ) else JSONB ()
312
-
313
- # string formats
314
- if jsonschema_type .get ("format" ) == "date-time" :
315
- return TIMESTAMP ()
316
- if jsonschema_type .get ("format" ) == "uuid" :
317
- return UUID ()
318
- if (
319
- self .interpret_content_encoding
320
- and jsonschema_type .get ("contentEncoding" ) == "base16"
321
- ):
322
- return HexByteString ()
323
- individual_type = th .to_sql_type (jsonschema_type )
324
- return TEXT () if isinstance (individual_type , VARCHAR ) else individual_type
342
+
343
+ return self .jsonschema_to_sql .to_sql_type (jsonschema_type )
325
344
326
345
@staticmethod
327
346
def pick_best_sql_type (sql_type_array : list ):
0 commit comments