-
Notifications
You must be signed in to change notification settings - Fork 185
Description
Greeting!
Please consider the following code. It's a conceptual model of a reactive chain which reads a stream of DDD-aggregates (in a real app I would be better off doing the same via spring crud-repositories). The first select retrieves roots of aggregates (user), selects from the downstream retrieve some related entities (accounts):
Flux.defer {
databaseClient.sql("select userId from users where userId >= $1 and userId <= $2")
.bind("$1", 1)
.bind("$2", 256)
.flatMap { r -> r.map { row, _ -> row.get(0).toString().toInt() } }
.flatMap { userId ->
databaseClient.sql("select login from accounts where userId=$1 limit 1")
.bind("$1", userId!!)
.flatMap { r -> r.map { row, meta -> row.get(0, kotlin.String::class.java) } }
}
}.`as`(transactionalOperator::transactional)
Problem
Such a chain can read at most 255 users. If the number exceeds an exception ("Cannot exchange messages because the request queue limit is exceeded") is thrown which is frustrating especially for spring-data-crud-repository users. Apparently if one wants to read a stream of (DDD) aggregates (via multiple crud-repositories) they are about to use either pagination (and carefully calculate all the requests the downstream might emit) or joins (the former is nothing to do with the reactive approach (we want streams, not pages)).
The reason of the aforementioned exception is head of line blocking: the conversation for "select userId from users" can't be removed from the conversations queue (io.r2dbc.postgresql.client.ReactorNettyClient) until Postgres's "Command completion" is received and the portal is closed thereafter, simultaneously conversations for the requests from the downstream are queueing up until the limit is reached and the exception is thrown.
Workaround
At the same time the task is doable. The following code (employing a cursor) is capable of reading an infinite stream of users and emitting additional request for each of them:
@Transactional(isolation = Isolation.REPEATABLE_READ, readOnly = true)
open fun browseUserIds(transformer: (Int) -> Mono<UserEntity>): Flux<UserEntity> =
Flux.usingWhen(
Mono.deferContextual {
Mono.just((it.get("cursorId") as String))
}.flatMap {
cursor -> client.sql("DECLARE $cursor CURSOR FOR SELECT userId FROM users;")
.flatMap { r -> Flux.just(cursor) }
.last()
},
{
cursor ->
Flux.generate { sink ->
sink.next(1)
}.concatMap {
client.sql("FETCH FORWARD $it FROM $cursor;")
.flatMap { r -> r.map { row, _ -> row.get("userId")!!.toString().toInt() } }
.switchIfEmpty(Mono.just(POISON_PILL))
}
.takeUntil { it == POISON_PILL }
// Emit an arbitrary number of selects for the next userId via crud-repositories/templates.
.concatMap { transformer.invoke(it) }
},
{
cursor -> client.sql("CLOSE $cursor;").then()
}
).contextWrite(
Context.of("cursorId", "uc_${UUID.randomUUID().toString().replace("-", "")}")
)
The key difference is that every cursor-related instruction is treated by the driver as a separate conversation plus suspension is intrinsic to portals created via declare cursor which both make possible to send additional (conventional) selects inside "transformer.invoke(it)".
Possible solution
At the same time according to this answer in pgsql-hackers (https://www.postgresql.org/message-id/1915c800-2c49-4039-a840-7cafc0654fe4%40iki.fi) there is no difference (for a backend) between a portal created via declare cursor and via bind.
So could the following approach pay off: if Postgres's "Portal suspended" is received for Nth conversation (a fetchSize is to be defined for the corresponding query which is already possible) the driver goes about executing subsequent conversations (generated by a downstream (if any)) and resumes the Nth conversation (via a new "Execute" command) only after all subsequent conversations are done, thereby the following simple code, a lot of developers tend to write, could work (now it can't owing to the aforementioned reasons):
@Transactional(isolation = Isolation.REPEATABLE_READ, readOnly = true)
override fun browseUsers(): Flow<User> =
// Not necessarily all but some considerable amount
crudUserEntityRepository.findAll().map {
User(
it.userId,
it.userName,
it.email,
crudAccountEntityRepository.findAllByUserId(it.userId).map { Account(it.login, it.password) }.toSet(),
crudPhoneEntityRepository.findAllByUserId(it.userId).map { Phone(it.number) }.toSet()
)
}
Motivation
In my eyes it would be a step change for spring-data users because it seems that the only way to read a stream of complex objects nowadays is either the aforementioned approach (employing a cursor) or joins. The former has undue complexity, the latter has a lot of disadvantages:
- joining several tables may be quite a task itself,
- tons of boilerplate code are inherent in dealing with joins,
- the more tables one joins the more chances postgresql ends up with an ineffective plan.
Thanks!