@@ -211,77 +211,20 @@ def max_record_count(self) -> int | None:
211
211
"""Return the maximum number of records to fetch in a single query."""
212
212
return self .config .get ("max_record_count" )
213
213
214
- def build_query (self , context : Context | None ) -> sa .sql .Select :
215
- """Build a SQLAlchemy query for the stream."""
216
- selected_column_names = self .get_selected_schema ()["properties" ].keys ()
217
- table = self .connector .get_table (
218
- full_table_name = self .fully_qualified_name ,
219
- column_names = selected_column_names ,
220
- )
221
- query = table .select ()
222
-
223
- if self .replication_key :
224
- replication_key_col = table .columns [self .replication_key ]
225
- order_by = (
226
- sa .nulls_first (replication_key_col .asc ())
227
- if self .supports_nulls_first
228
- else replication_key_col .asc ()
229
- )
230
- query = query .order_by (order_by )
231
-
232
- start_val = self .get_starting_replication_key_value (context )
233
- if start_val :
234
- query = query .where (replication_key_col >= start_val )
235
-
214
+ def apply_query_filters (
215
+ self ,
216
+ query : sa .sql .Select ,
217
+ table : sa .Table ,
218
+ * ,
219
+ context : Context | None = None ,
220
+ ) -> sa .sql .Select :
221
+ """Apply query filters to the query."""
222
+ query = super ().apply_query_filters (query , table , context = context )
236
223
stream_options = self .config .get ("stream_options" , {}).get (self .name , {})
237
224
if clauses := stream_options .get ("custom_where_clauses" ):
238
225
query = query .where (* (sa .text (clause .strip ()) for clause in clauses ))
239
-
240
- if self .ABORT_AT_RECORD_COUNT is not None :
241
- # Limit record count to one greater than the abort threshold. This ensures
242
- # `MaxRecordsLimitException` exception is properly raised by caller
243
- # `Stream._sync_records()` if more records are available than can be
244
- # processed.
245
- query = query .limit (self .ABORT_AT_RECORD_COUNT + 1 )
246
-
247
- if self .max_record_count ():
248
- query = query .limit (self .max_record_count ())
249
-
250
226
return query
251
227
252
- # Get records from stream
253
- def get_records (self , context : Context | None ) -> t .Iterable [dict [str , t .Any ]]:
254
- """Return a generator of record-type dictionary objects.
255
-
256
- If the stream has a replication_key value defined, records will be sorted by the
257
- incremental key. If the stream also has an available starting bookmark, the
258
- records will be filtered for values greater than or equal to the bookmark value.
259
-
260
- Args:
261
- context: If partition context is provided, will read specifically from this
262
- data slice.
263
-
264
- Yields:
265
- One dict per record.
266
-
267
- Raises:
268
- NotImplementedError: If partition is passed in context and the stream does
269
- not support partitioning.
270
- """
271
- if context :
272
- msg = f"Stream '{ self .name } ' does not support partitioning."
273
- raise NotImplementedError (msg )
274
-
275
- with self .connector ._connect () as conn :
276
- for record in conn .execute (self .build_query (context )).mappings ():
277
- # TODO: Standardize record mapping type
278
- # https://github.com/meltano/sdk/issues/2096
279
- transformed_record = self .post_process (dict (record ))
280
- if transformed_record is None :
281
- # Record filtered out during post_process()
282
- continue
283
- yield transformed_record
284
-
285
228
286
229
class PostgresLogBasedStream (SQLStream ):
287
230
"""Stream class for Postgres log-based streams."""
0 commit comments