-
Notifications
You must be signed in to change notification settings - Fork 425
Description
hey all, SQLAlchemy guy here, we have a user who submitted a complex deadlock example using asyncpg, and after much effort I've managed to eliminate all of SQLAlchemy as a factor and I can reproduce the problem with a very simple script using asyncpg alone. I'm not really able to figure out what's actually happening here as this gets into the usual realm of asyncio tasks and cancellations (see patch provided where the CancelledError should likely be handled). To try to isolate this as an asyncpg issue, I also reproduce the issue with the psycopg asyncio driver, which behaves differently and slightly better (though still has an odd timeout).
the script runs a long running INSERT statement on three separate connections, each in individual tasks sent to asyncio.gather()
. A fourth task does nothing but raises an exception immediately. Once the fourth task is encountered, the first three tasks are interrupted, as asyncio.gather()
is attempting to cancel all tasks. however asyncpg does not respond to the cancel and hangs instead, whereas the psycopg asyncio driver pauses for about 12 seconds when the task queue is being cancelled, but then the exception raises.
I'm running this against PostgreSQL 16.9 running on localhost. the user is running pg 17 something. The output with psycopg looks like:
Running with psycopg
Starting task 0
Starting task 1
Starting task 2
Failing - with psycopg, we expect the script to pause for about 12 seconds before raising ValueError
the long executemany() call was interrupted with: CancelledError()
the long executemany() call was interrupted with: CancelledError()
the long executemany() call was interrupted with: CancelledError()
Traceback (most recent call last):
File "/home/classic/dev/sqlalchemy/test4.py", line 124, in <module>
asyncio.run(main())
~~~~~~~~~~~^^^^^^^^
File "/usr/lib64/python3.13/asyncio/runners.py", line 195, in run
return runner.run(main)
~~~~~~~~~~^^^^^^
File "/usr/lib64/python3.13/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
File "/usr/lib64/python3.13/asyncio/base_events.py", line 719, in run_until_complete
return future.result()
~~~~~~~~~~~~~^^
File "/home/classic/dev/sqlalchemy/test4.py", line 120, in main
await asyncio.gather(*tasks)
File "/home/classic/dev/sqlalchemy/test4.py", line 95, in crashing_task
raise ValueError("Crashing task failed as expected")
ValueError: Crashing task failed as expected
Whereas with asyncpg, it hangs until ctrl-C is pressed. the trace after ctrl-C is also shown below
Running with asyncpg
Starting task 0
Starting task 1
Starting task 2
Failing - with asyncpg, we expect the script to hang indefinitely until ctrl-C is used
^CTraceback (most recent call last):
File "/usr/lib64/python3.13/asyncio/runners.py", line 195, in run
return runner.run(main)
~~~~~~~~~~^^^^^^
File "/usr/lib64/python3.13/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
File "/usr/lib64/python3.13/asyncio/base_events.py", line 719, in run_until_complete
return future.result()
~~~~~~~~~~~~~^^
File "/home/classic/dev/sqlalchemy/test4.py", line 120, in main
await asyncio.gather(*tasks)
File "/home/classic/dev/sqlalchemy/test4.py", line 95, in crashing_task
raise ValueError("Crashing task failed as expected")
ValueError: Crashing task failed as expected
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/classic/dev/sqlalchemy/test4.py", line 124, in <module>
asyncio.run(main())
~~~~~~~~~~~^^^^^^^^
File "/usr/lib64/python3.13/asyncio/runners.py", line 194, in run
with Runner(debug=debug, loop_factory=loop_factory) as runner:
~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.13/asyncio/runners.py", line 62, in __exit__
self.close()
~~~~~~~~~~^^
File "/usr/lib64/python3.13/asyncio/runners.py", line 70, in close
_cancel_all_tasks(loop)
~~~~~~~~~~~~~~~~~^^^^^^
File "/usr/lib64/python3.13/asyncio/runners.py", line 206, in _cancel_all_tasks
loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.13/asyncio/base_events.py", line 706, in run_until_complete
self.run_forever()
~~~~~~~~~~~~~~~~^^
File "/usr/lib64/python3.13/asyncio/base_events.py", line 677, in run_forever
self._run_once()
~~~~~~~~~~~~~~^^
File "/usr/lib64/python3.13/asyncio/base_events.py", line 1996, in _run_once
event_list = self._selector.select(timeout)
File "/usr/lib64/python3.13/selectors.py", line 452, in select
fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt
Task was destroyed but it is pending!
task: <Task cancelling name='Task-2' coro=<long_commit_task() running at /home/classic/dev/sqlalchemy/test4.py:66> wait_for=<Future pending cb=[BaseProtocol._on_waiter_completed(), Task.task_wakeup()]> cb=[gather.<locals>._done_callback() at /usr/lib64/python3.13/asyncio/tasks.py:820, gather.<locals>._done_callback() at /usr/lib64/python3.13/asyncio/tasks.py:820]>
Task was destroyed but it is pending!
task: <Task cancelling name='Task-4' coro=<long_commit_task() running at /home/classic/dev/sqlalchemy/test4.py:66> wait_for=<Future pending cb=[BaseProtocol._on_waiter_completed(), Task.task_wakeup()]> cb=[gather.<locals>._done_callback() at /usr/lib64/python3.13/asyncio/tasks.py:820, gather.<locals>._done_callback() at /usr/lib64/python3.13/asyncio/tasks.py:820]>
Task was destroyed but it is pending!
task: <Task cancelling name='Task-3' coro=<long_commit_task() running at /home/classic/dev/sqlalchemy/test4.py:66> wait_for=<Future pending cb=[BaseProtocol._on_waiter_completed(), Task.task_wakeup()]> cb=[gather.<locals>._done_callback() at /usr/lib64/python3.13/asyncio/tasks.py:820, gather.<locals>._done_callback() at /usr/lib64/python3.13/asyncio/tasks.py:820]>
the long executemany() call was interrupted with: GeneratorExit()
the long executemany() call was interrupted with: GeneratorExit()
the long executemany() call was interrupted with: GeneratorExit()
For both versions, pg_stat_activity doesnt show any deadlock and instead shows ClientRead:
test=> SELECT pid, wait_event, query, query_start, state
FROM pg_stat_activity
WHERE state IS NOT NULL AND pid != PG_BACKEND_PID();
pid | wait_event | query | query_start | state
---------+------------+---------------------------------------------------------------+-------------------------------+--------
2072934 | ClientRead | INSERT INTO lotsa_rows_table (id, task_index) VALUES ($1, $2) | 2025-07-11 00:31:20.526354-04 | active
2072936 | ClientRead | INSERT INTO lotsa_rows_table (id, task_index) VALUES ($1, $2) | 2025-07-11 00:31:20.526412-04 | active
2072938 | ClientRead | INSERT INTO lotsa_rows_table (id, task_index) VALUES ($1, $2) | 2025-07-11 00:31:20.526307-04 | active
(3 rows)
I've made this script as straightforward and obvious as possible and can confirm the patch provided in the next comment resolves.
import asyncio
import contextlib
import uuid
# this script illustrates identical sequences with asyncpg and the
# psycopg async driver. The steps here are heavily refined from a
# user-submitted script using SQLAlchemy, however here I've managed to
# reduce the issue down to just having a few simultaneous connections
# within tasks in a gather() call, while also having an exception raised
# from one task. The issue does not seem to be related to database deadlocks.
# set to False to use asyncio psycopg instead
USE_ASYNCPG = True
# adjust URL for desired connection
URL = "postgresql://scott:tiger@localhost/test"
# when asyncpg is used, the script reaches the failing task and hangs
# indefinitely until ctrl-C is pressed.
#
# when using psycopg, the script reaches the failing task, there's a delay
# of about 12 seconds, then the executemany() calls are cancelled with
# CancelledError() and the ValueError is thrown.
async def make_a_connection():
"""make a new async DB connection from either asyncpg or psycopg"""
if USE_ASYNCPG:
import asyncpg
return await asyncpg.connect(URL)
else:
import psycopg
connection = await psycopg.AsyncConnection.connect(URL)
# using autocommit to help indicate this does not seem to be a DB
# deadlock
await connection.set_autocommit(True)
return connection
@contextlib.asynccontextmanager
async def execute_on(connection):
"""produce a connection or cursor with an execute()/executemany() method.
The asyncpg and psycopg APIs differ in this regard, this function abstracts
away that detail.
"""
if USE_ASYNCPG:
yield connection
else:
cursor = connection.cursor()
yield cursor
await cursor.close()
async def long_commit_task(task_index, connection):
"""runs a large executemany() INSERT statement on the given connection."""
print(f"Starting task {task_index}")
try:
async with execute_on(connection) as conn:
await conn.executemany(
"INSERT INTO lotsa_rows_table (id, task_index) "
+ (
"VALUES ($1::UUID, $2::INTEGER)"
if USE_ASYNCPG
else "VALUES (%s, %s)"
),
[(uuid.uuid4(), task_index) for i in range(100000)],
)
except BaseException as be:
print(f"the long executemany() call was interrupted with: {be!r}")
raise
print(f"Task {task_index} Done")
async def crashing_task():
"""raises an error.."""
if USE_ASYNCPG:
print(
"Failing - with asyncpg, we expect the "
"script to hang indefinitely until ctrl-C is used"
)
else:
print(
"Failing - with psycopg, we expect the "
"script to pause for about 12 seconds before raising ValueError"
)
raise ValueError("Crashing task failed as expected")
async def main():
print(f"Running with {'asyncpg' if USE_ASYNCPG else 'psycopg'}")
connection = await make_a_connection()
async with execute_on(connection) as conn:
await conn.execute("DROP TABLE IF EXISTS lotsa_rows_table")
await conn.execute(
"CREATE TABLE lotsa_rows_table (id uuid PRIMARY KEY, task_index INT)"
)
await connection.close()
# create three tasks that each insert a large number of rows
tasks = [
asyncio.create_task(
long_commit_task(i, connection=(await make_a_connection())),
)
for i in range(3)
]
# create one more task that fails immediately
tasks.append(asyncio.create_task(crashing_task()))
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())