Confusing asyncio task cancellation behavior
I'm confused by the behavior of the asyncio code below:
import time
import asyncio
from threading import Thread
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
event_loop = None
q = None
# queue items processing
async def _main():
global event_loop, q
q = asyncio.Queue(maxsize=5)
event_loop = asyncio.get_running_loop()
try:
while True:
try:
new_data = await asyncio.wait_for(q.get(), timeout=1)
logger.info(new_data)
q.task_done()
except asyncio.TimeoutError:
logger.warning(f'timeout - main cancelled? {asyncio.current_task().cancelled()}')
except asyncio.CancelledError:
logger.warning(f'cancelled')
raise
def _event_loop_thread():
try:
asyncio.run(_main(), debug=True)
except asyncio.CancelledError:
logger.warning('main was cancelled')
thread = Thread(target=_event_loop_thread)
thread.start()
# wait for the event loop to start
while not event_loop:
time.sleep(0.1)
async def _push(a):
try:
try:
await q.put(a)
await asyncio.sleep(0.1)
except asyncio.QueueFull:
logger.warning('q full')
except asyncio.CancelledError:
logger.warning('push cancelled')
raise
# push some stuff to the queue
for i in range(10):
future = asyncio.run_coroutine_threadsafe(_push(f'processed {i}'), event_loop)
pending_tasks = asyncio.all_tasks(loop=event_loop)
# cancel each pending task
for task in pending_tasks:
logger.info(f'killing task {task.get_coro()}')
event_loop.call_soon_threadsafe(task.cancel)
logger.info('finished')
Which produces the following output:
INFO:__main__:killing task <coroutine object _main at 0x7f7ff05d6a40>
INFO:__main__:killing task <coroutine object _push at 0x7f7fefd17140>
INFO:__main__:killing task <coroutine object _push at 0x7f7fefd0fbc0>
INFO:__main__:killing task <coroutine object Queue.get at 0x7f7fefd7dd40>
INFO:__main__:killing task <coroutine object _push at 0x7f7fefd170c0>
INFO:__main__:finished
INFO:__main__:processed 0
WARNING:__main__:push cancelled
WARNING:__main__:push cancelled
WARNING:__main__:push cancelled
INFO:__main__:processed 1
INFO:__main__:processed 2
INFO:__main__:processed 3
INFO:__main__:processed 4
INFO:__main__:processed 5
INFO:__main__:processed 6
INFO:__main__:processed 7
INFO:__main__:processed 8
INFO:__main__:processed 9
WARNING:__main__:timeout - main cancelled? False
WARNING:__main__:timeout - main cancelled? False
WARNING:__main__:timeout - main cancelled? False
WARNING:__main__:timeout - main cancelled? False
WARNING:__main__:timeout - main cancelled? False
Why does the _main() coro never get cancelled? I've looked through the asyncio documentation and haven't found anything that hints at what might be going on.
Furthermore, if you replace the line:
new_data = await asyncio.wait_for(q.get(), timeout=1)
With:
new_data = await q.get()
Things behave as expected. The _main() and all other tasks get properly cancelled. So it seems to be a problem with async.wait_for().
What I'm trying to do here is have a producer / consumer model where the consumer is the _main() task in the asyncio event loop (running in a separate thread) and the main thread is the producer (using _push()).
Thanks
Solution 1:
Unfortunately you have stumbled on an outstanding bug in the asyncio package: https://bugs.python.org/issue42130. As you observe, asyncio.wait_for
can suppress a CancelledError under some circumstances. This occurs when the awaitable passed to wait_for has actually finished when the cancellation occurs; wait_for then returns the awaitable's result without propagating the cancellation. (I also learned about this the hard way.)
The only available fix at the moment (as far as I know) is to avoid using wait_for in any coroutine that can be cancelled. Perhaps in your case you can simply await q.get()
and not worry about the possibility of a timeout.
I would like to point out, in passing, that your program is seriously non-deterministic. What I mean is that you are not synchronizing the activity between the two threads - and that has some strange consequences. Did you notice, for example, that you created 10 tasks based on the _push coroutine, yet you only cancelled 3 of them? That happened because you fired off 10 task creations to the second thread:
# push some stuff to the queue
for i in range(10):
future = asyncio.run_coroutine_threadsafe(_push(f'processed {i}'), event_loop)
but without waiting on any of the returned futures, you immediately started to cancel tasks:
pending_tasks = asyncio.all_tasks(loop=event_loop)
# cancel each pending task
for task in pending_tasks:
logger.info(f'killing task {task.get_coro()}')
event_loop.call_soon_threadsafe(task.cancel)
Apparently the second thread hadn't finished creating all the tasks yet, so your task cancellation logic was hit-and-miss.
Allocating CPU time slices between two threads is an OS function, and if you want things in different threads to happen in a specific order you must write explicit logic. When I ran your exact code on my machine (python3.10, Windows 10) I got significantly different behavior from what you reported.
This wasn't the real problem, as it turns out, but it's hard to troubleshoot a program that doesn't do the same thing every time.