Confusing asyncio task cancellation behavior

I'm confused by the behavior of the asyncio code below:

import time
import asyncio
from threading import Thread
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

event_loop = None
q = None

# queue items processing
async def _main():
    global event_loop, q
    q = asyncio.Queue(maxsize=5)
    event_loop = asyncio.get_running_loop()
    try:
        while True:
            try:
                new_data = await asyncio.wait_for(q.get(), timeout=1)
                logger.info(new_data)
                q.task_done()
            except asyncio.TimeoutError:
                logger.warning(f'timeout - main cancelled? {asyncio.current_task().cancelled()}')
    except asyncio.CancelledError:
        logger.warning(f'cancelled')
        raise

def _event_loop_thread():
    try:
        asyncio.run(_main(), debug=True)
    except asyncio.CancelledError:
        logger.warning('main was cancelled')

thread = Thread(target=_event_loop_thread)
thread.start()

# wait for the event loop to start
while not event_loop:
    time.sleep(0.1)

async def _push(a):
    try:
        try:
            await q.put(a)
            await asyncio.sleep(0.1)
        except asyncio.QueueFull:
            logger.warning('q full')
    except asyncio.CancelledError:
        logger.warning('push cancelled')
        raise

# push some stuff to the queue
for i in range(10):
    future = asyncio.run_coroutine_threadsafe(_push(f'processed {i}'), event_loop)

pending_tasks = asyncio.all_tasks(loop=event_loop)
# cancel each pending task
for task in pending_tasks:
    logger.info(f'killing task {task.get_coro()}')
    event_loop.call_soon_threadsafe(task.cancel)

logger.info('finished')

Which produces the following output:

INFO:__main__:killing task <coroutine object _main at 0x7f7ff05d6a40>
INFO:__main__:killing task <coroutine object _push at 0x7f7fefd17140>
INFO:__main__:killing task <coroutine object _push at 0x7f7fefd0fbc0>
INFO:__main__:killing task <coroutine object Queue.get at 0x7f7fefd7dd40>
INFO:__main__:killing task <coroutine object _push at 0x7f7fefd170c0>
INFO:__main__:finished
INFO:__main__:processed 0
WARNING:__main__:push cancelled
WARNING:__main__:push cancelled
WARNING:__main__:push cancelled
INFO:__main__:processed 1
INFO:__main__:processed 2
INFO:__main__:processed 3
INFO:__main__:processed 4
INFO:__main__:processed 5
INFO:__main__:processed 6
INFO:__main__:processed 7
INFO:__main__:processed 8
INFO:__main__:processed 9
WARNING:__main__:timeout - main cancelled? False
WARNING:__main__:timeout - main cancelled? False
WARNING:__main__:timeout - main cancelled? False
WARNING:__main__:timeout - main cancelled? False
WARNING:__main__:timeout - main cancelled? False

Why does the _main() coro never get cancelled? I've looked through the asyncio documentation and haven't found anything that hints at what might be going on.

Furthermore, if you replace the line:

new_data = await asyncio.wait_for(q.get(), timeout=1)

With:

new_data = await q.get()

Things behave as expected. The _main() and all other tasks get properly cancelled. So it seems to be a problem with async.wait_for().

What I'm trying to do here is have a producer / consumer model where the consumer is the _main() task in the asyncio event loop (running in a separate thread) and the main thread is the producer (using _push()).

Thanks


Solution 1:

Unfortunately you have stumbled on an outstanding bug in the asyncio package: https://bugs.python.org/issue42130. As you observe, asyncio.wait_for can suppress a CancelledError under some circumstances. This occurs when the awaitable passed to wait_for has actually finished when the cancellation occurs; wait_for then returns the awaitable's result without propagating the cancellation. (I also learned about this the hard way.)

The only available fix at the moment (as far as I know) is to avoid using wait_for in any coroutine that can be cancelled. Perhaps in your case you can simply await q.get() and not worry about the possibility of a timeout.

I would like to point out, in passing, that your program is seriously non-deterministic. What I mean is that you are not synchronizing the activity between the two threads - and that has some strange consequences. Did you notice, for example, that you created 10 tasks based on the _push coroutine, yet you only cancelled 3 of them? That happened because you fired off 10 task creations to the second thread:

# push some stuff to the queue
for i in range(10):
    future = asyncio.run_coroutine_threadsafe(_push(f'processed {i}'), event_loop)

but without waiting on any of the returned futures, you immediately started to cancel tasks:

pending_tasks = asyncio.all_tasks(loop=event_loop)
# cancel each pending task
for task in pending_tasks:
    logger.info(f'killing task {task.get_coro()}')
    event_loop.call_soon_threadsafe(task.cancel)

Apparently the second thread hadn't finished creating all the tasks yet, so your task cancellation logic was hit-and-miss.

Allocating CPU time slices between two threads is an OS function, and if you want things in different threads to happen in a specific order you must write explicit logic. When I ran your exact code on my machine (python3.10, Windows 10) I got significantly different behavior from what you reported.

This wasn't the real problem, as it turns out, but it's hard to troubleshoot a program that doesn't do the same thing every time.