Skip to content

CancelledError not correctly handled in bind_execute_many() (was: unusual asyncio-related deadlock seen with gather() where one or more tasks are failing) #1265

@zzzeek

Description

@zzzeek

hey all, SQLAlchemy guy here, we have a user who submitted a complex deadlock example using asyncpg, and after much effort I've managed to eliminate all of SQLAlchemy as a factor and I can reproduce the problem with a very simple script using asyncpg alone. I'm not really able to figure out what's actually happening here as this gets into the usual realm of asyncio tasks and cancellations (see patch provided where the CancelledError should likely be handled). To try to isolate this as an asyncpg issue, I also reproduce the issue with the psycopg asyncio driver, which behaves differently and slightly better (though still has an odd timeout).

the script runs a long running INSERT statement on three separate connections, each in individual tasks sent to asyncio.gather(). A fourth task does nothing but raises an exception immediately. Once the fourth task is encountered, the first three tasks are interrupted, as asyncio.gather() is attempting to cancel all tasks. however asyncpg does not respond to the cancel and hangs instead, whereas the psycopg asyncio driver pauses for about 12 seconds when the task queue is being cancelled, but then the exception raises.

I'm running this against PostgreSQL 16.9 running on localhost. the user is running pg 17 something. The output with psycopg looks like:

Running with psycopg
Starting task 0
Starting task 1
Starting task 2
Failing - with psycopg, we expect the script to pause for about 12 seconds before raising ValueError
the long executemany() call was interrupted with: CancelledError()
the long executemany() call was interrupted with: CancelledError()
the long executemany() call was interrupted with: CancelledError()
Traceback (most recent call last):
  File "/home/classic/dev/sqlalchemy/test4.py", line 124, in <module>
    asyncio.run(main())
    ~~~~~~~~~~~^^^^^^^^
  File "/usr/lib64/python3.13/asyncio/runners.py", line 195, in run
    return runner.run(main)
           ~~~~~~~~~~^^^^^^
  File "/usr/lib64/python3.13/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
  File "/usr/lib64/python3.13/asyncio/base_events.py", line 719, in run_until_complete
    return future.result()
           ~~~~~~~~~~~~~^^
  File "/home/classic/dev/sqlalchemy/test4.py", line 120, in main
    await asyncio.gather(*tasks)
  File "/home/classic/dev/sqlalchemy/test4.py", line 95, in crashing_task
    raise ValueError("Crashing task failed as expected")
ValueError: Crashing task failed as expected

Whereas with asyncpg, it hangs until ctrl-C is pressed. the trace after ctrl-C is also shown below

Running with asyncpg
Starting task 0
Starting task 1
Starting task 2
Failing - with asyncpg, we expect the script to hang indefinitely until ctrl-C is used
^CTraceback (most recent call last):
  File "/usr/lib64/python3.13/asyncio/runners.py", line 195, in run
    return runner.run(main)
           ~~~~~~~~~~^^^^^^
  File "/usr/lib64/python3.13/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
  File "/usr/lib64/python3.13/asyncio/base_events.py", line 719, in run_until_complete
    return future.result()
           ~~~~~~~~~~~~~^^
  File "/home/classic/dev/sqlalchemy/test4.py", line 120, in main
    await asyncio.gather(*tasks)
  File "/home/classic/dev/sqlalchemy/test4.py", line 95, in crashing_task
    raise ValueError("Crashing task failed as expected")
ValueError: Crashing task failed as expected

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/classic/dev/sqlalchemy/test4.py", line 124, in <module>
    asyncio.run(main())
    ~~~~~~~~~~~^^^^^^^^
  File "/usr/lib64/python3.13/asyncio/runners.py", line 194, in run
    with Runner(debug=debug, loop_factory=loop_factory) as runner:
         ~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.13/asyncio/runners.py", line 62, in __exit__
    self.close()
    ~~~~~~~~~~^^
  File "/usr/lib64/python3.13/asyncio/runners.py", line 70, in close
    _cancel_all_tasks(loop)
    ~~~~~~~~~~~~~~~~~^^^^^^
  File "/usr/lib64/python3.13/asyncio/runners.py", line 206, in _cancel_all_tasks
    loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
    ~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.13/asyncio/base_events.py", line 706, in run_until_complete
    self.run_forever()
    ~~~~~~~~~~~~~~~~^^
  File "/usr/lib64/python3.13/asyncio/base_events.py", line 677, in run_forever
    self._run_once()
    ~~~~~~~~~~~~~~^^
  File "/usr/lib64/python3.13/asyncio/base_events.py", line 1996, in _run_once
    event_list = self._selector.select(timeout)
  File "/usr/lib64/python3.13/selectors.py", line 452, in select
    fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt
Task was destroyed but it is pending!
task: <Task cancelling name='Task-2' coro=<long_commit_task() running at /home/classic/dev/sqlalchemy/test4.py:66> wait_for=<Future pending cb=[BaseProtocol._on_waiter_completed(), Task.task_wakeup()]> cb=[gather.<locals>._done_callback() at /usr/lib64/python3.13/asyncio/tasks.py:820, gather.<locals>._done_callback() at /usr/lib64/python3.13/asyncio/tasks.py:820]>
Task was destroyed but it is pending!
task: <Task cancelling name='Task-4' coro=<long_commit_task() running at /home/classic/dev/sqlalchemy/test4.py:66> wait_for=<Future pending cb=[BaseProtocol._on_waiter_completed(), Task.task_wakeup()]> cb=[gather.<locals>._done_callback() at /usr/lib64/python3.13/asyncio/tasks.py:820, gather.<locals>._done_callback() at /usr/lib64/python3.13/asyncio/tasks.py:820]>
Task was destroyed but it is pending!
task: <Task cancelling name='Task-3' coro=<long_commit_task() running at /home/classic/dev/sqlalchemy/test4.py:66> wait_for=<Future pending cb=[BaseProtocol._on_waiter_completed(), Task.task_wakeup()]> cb=[gather.<locals>._done_callback() at /usr/lib64/python3.13/asyncio/tasks.py:820, gather.<locals>._done_callback() at /usr/lib64/python3.13/asyncio/tasks.py:820]>
the long executemany() call was interrupted with: GeneratorExit()
the long executemany() call was interrupted with: GeneratorExit()
the long executemany() call was interrupted with: GeneratorExit()

For both versions, pg_stat_activity doesnt show any deadlock and instead shows ClientRead:

test=> SELECT pid, wait_event, query, query_start, state
FROM pg_stat_activity
WHERE state IS NOT NULL AND pid != PG_BACKEND_PID();
   pid   | wait_event |                             query                             |          query_start          | state  
---------+------------+---------------------------------------------------------------+-------------------------------+--------
 2072934 | ClientRead | INSERT INTO lotsa_rows_table (id, task_index) VALUES ($1, $2) | 2025-07-11 00:31:20.526354-04 | active
 2072936 | ClientRead | INSERT INTO lotsa_rows_table (id, task_index) VALUES ($1, $2) | 2025-07-11 00:31:20.526412-04 | active
 2072938 | ClientRead | INSERT INTO lotsa_rows_table (id, task_index) VALUES ($1, $2) | 2025-07-11 00:31:20.526307-04 | active
(3 rows)

I've made this script as straightforward and obvious as possible and can confirm the patch provided in the next comment resolves.

import asyncio
import contextlib
import uuid

# this script illustrates identical sequences with asyncpg and the
# psycopg async driver.   The steps here are heavily refined from a
# user-submitted script using SQLAlchemy, however here I've managed to
# reduce the issue down to just having a few simultaneous connections
# within tasks in a gather() call, while also having an exception raised
# from one task.   The issue does not seem to be related to database deadlocks.

# set to False to use asyncio psycopg instead
USE_ASYNCPG = True

# adjust URL for desired connection
URL = "postgresql://scott:tiger@localhost/test"

# when asyncpg is used, the script reaches the failing task and hangs
# indefinitely until ctrl-C is pressed.
#
# when using psycopg, the script reaches the failing task, there's a delay
# of about 12 seconds, then the executemany() calls are cancelled with
# CancelledError() and the ValueError is thrown.


async def make_a_connection():
    """make a new async DB connection from either asyncpg or psycopg"""

    if USE_ASYNCPG:
        import asyncpg

        return await asyncpg.connect(URL)
    else:
        import psycopg

        connection = await psycopg.AsyncConnection.connect(URL)
        # using autocommit to help indicate this does not seem to be a DB
        # deadlock
        await connection.set_autocommit(True)
        return connection


@contextlib.asynccontextmanager
async def execute_on(connection):
    """produce a connection or cursor with an execute()/executemany() method.

    The asyncpg and psycopg APIs differ in this regard, this function abstracts
    away that detail.

    """

    if USE_ASYNCPG:
        yield connection
    else:
        cursor = connection.cursor()
        yield cursor
        await cursor.close()


async def long_commit_task(task_index, connection):
    """runs a large executemany() INSERT statement on the given connection."""

    print(f"Starting task {task_index}")
    try:
        async with execute_on(connection) as conn:
            await conn.executemany(
                "INSERT INTO lotsa_rows_table (id, task_index) "
                + (
                    "VALUES ($1::UUID, $2::INTEGER)"
                    if USE_ASYNCPG
                    else "VALUES (%s, %s)"
                ),
                [(uuid.uuid4(), task_index) for i in range(100000)],
            )
    except BaseException as be:
        print(f"the long executemany() call was interrupted with: {be!r}")
        raise
    print(f"Task {task_index} Done")


async def crashing_task():
    """raises an error.."""

    if USE_ASYNCPG:
        print(
            "Failing - with asyncpg, we expect the "
            "script to hang indefinitely until ctrl-C is used"
        )
    else:
        print(
            "Failing - with psycopg, we expect the "
            "script to pause for about 12 seconds before raising ValueError"
        )

    raise ValueError("Crashing task failed as expected")


async def main():
    print(f"Running with {'asyncpg' if USE_ASYNCPG else 'psycopg'}")

    connection = await make_a_connection()
    async with execute_on(connection) as conn:
        await conn.execute("DROP TABLE IF EXISTS lotsa_rows_table")
        await conn.execute(
            "CREATE TABLE lotsa_rows_table (id uuid PRIMARY KEY, task_index INT)"
        )
    await connection.close()

    # create three tasks that each insert a large number of rows
    tasks = [
        asyncio.create_task(
            long_commit_task(i, connection=(await make_a_connection())),
        )
        for i in range(3)
    ]

    # create one more task that fails immediately
    tasks.append(asyncio.create_task(crashing_task()))

    await asyncio.gather(*tasks)


if __name__ == "__main__":
    asyncio.run(main())

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions