Handling Timeouts with asyncio
Disclaimer: this is my first time experimenting with the asyncio
module.
I'm using asyncio.wait
in the following manner to try to support a timeout feature waiting for all results from a set of async tasks. This is part of a larger library so I'm omitting some irrelevant code.
Note that the library already supports submitting tasks and using timeouts with ThreadPoolExecutors and ProcessPoolExecutors, so I'm not really interested in suggestions to use those instead or questions about why I'm doing this with asyncio
. On to the code...
import asyncio
from contextlib import suppress
...
class AsyncIOSubmit(Node):
def get_results(self, futures, timeout=None):
loop = asyncio.get_event_loop()
finished, unfinished = loop.run_until_complete(
asyncio.wait(futures, timeout=timeout)
)
if timeout and unfinished:
# Code options in question would go here...see below.
raise asyncio.TimeoutError
At first I was not worrying about cancelling pending tasks on timeout, but then I got the warning Task was destroyed but it is pending!
on program exit or loop.close
. After researching a bit I found multiple ways to cancel tasks and wait for them to actually be cancelled:
Option 1:
[task.cancel() for task in unfinished]
for task in unfinished:
with suppress(asyncio.CancelledError):
loop.run_until_complete(task)
Option 2:
[task.cancel() for task in unfinished]
loop.run_until_complete(asyncio.wait(unfinished))
Option 3:
# Not really an option for me, since I'm not in an `async` method
# and don't want to make get_results an async method.
[task.cancel() for task in unfinished]
for task in unfinished:
await task
Option 4:
Some sort of while loop like in this answer. Seems like my other options are better but including for completeness.
Options 1 and 2 both seem to work fine so far. Either option may be "right", but with asyncio
evolving over the years the examples and suggestions around the net are either outdated or vary quite a bit. So my questions are...
Question 1
Are there any practical differences between Options 1 and 2? I know run_until_complete
will run until the future has completed, so since Option 1 is looping in a specific order I suppose it could behave differently if earlier tasks take longer to actually complete. I tried looking at the asyncio source code to understand if asyncio.wait
just effectively does the same thing with its tasks/futures under the hood, but it wasn't obvious.
Question 2
I assume if one of the tasks is in the middle of a long-running blocking operation it may not actually cancel immediately? Perhaps that just depends on if the underlying operation or library being used will raise the CancelledError right away or not? Maybe that should never happen with libraries designed for asyncio?
Since I'm trying to implement a timeout feature here I'm somewhat sensitive to this. If it's possible these things could take a long time to cancel I'd consider calling cancel
and not waiting for it to actually happen, or setting a very short timeout to wait for the cancels to finish.
Question 3
Is it possible loop.run_until_complete
(or really, the underlying call to async.wait
) returns values in unfinished
for a reason other than a timeout? If so I'd obviously have to adjust my logic a bit, but from the docs it seems like that is not possible.
Solution 1:
Are there any practical differences between Options 1 and 2?
No. Option 2 looks nicer and might be marginally more efficient, but their net effect is the same.
I know
run_until_complete
will run until the future has completed, so since Option 1 is looping in a specific order I suppose it could behave differently if earlier tasks take longer to actually complete.
It seems that way at first, but it's not actually the case because loop.run_until_complete
runs all tasks submitted to the loop, not just the one passed as argument. It merely stops once the provided awaitable completes - that is what "run until complete" refers to. A loop calling run_until_complete
over already scheduled tasks is like the following async code:
ts = [asyncio.create_task(asyncio.sleep(i)) for i in range(1, 11)]
# takes 10s, not 55s
for t in ts:
await t
which is in turn semantically equivalent to the following threaded code:
ts = []
for i in range(1, 11):
t = threading.Thread(target=time.sleep, args=(i,))
t.start()
ts.append(t)
# takes 10s, not 55s
for t in ts:
t.join()
In other words, await t
and run_until_complete(t)
block until t
has completed, but allow everything else - such as tasks previously scheduled using asyncio.create_task()
to run during that time as well. So the total run time will equal the run time of the longest task, not of their sum. For example, if the first task happens to take a long time, all others will have finished in the meantime, and their awaits won't sleep at all.
All this only applies to awaiting tasks that have been previously scheduled. If you try to apply that to coroutines, it won't work:
# runs for 55s, as expected
for i in range(1, 11):
await asyncio.sleep(i)
# also 55s - we didn't call create_task() so it's equivalent to the above
ts = [asyncio.sleep(i) for i in range(1, 11)]
for t in ts:
await t
# also 55s
for i in range(1, 11):
t = threading.Thread(target=time.sleep, args=(i,))
t.start()
t.join()
This is often a sticking point for asyncio beginners, who write code equivalent to that last asyncio example and expect it to run in parallel.
I tried looking at the asyncio source code to understand if
asyncio.wait
just effectively does the same thing with its tasks/futures under the hood, but it wasn't obvious.
asyncio.wait
is just a convenience API that does two things:
- converts the input arguments to something that implements
Future
. For coroutines that means that it submits them to the event loop, as if withcreate_task
, which allows them to run independently. If you give it tasks to begin with, as you do, this step is skipped. - uses
add_done_callback
to be notified when the futures are done, at which point it resumes its caller.
So yes, it does the same things, but with a different implementation because it supports many more features.
I assume if one of the tasks is in the middle of a long-running blocking operation it may not actually cancel immediately?
In asyncio there shouldn't be "blocking" operations, only those that suspend, and they should be cancelled immediately. The exception to this is blocking code tacked onto asyncio with run_in_executor
, where the underlying operation won't cancel at all, but the asyncio coroutine will immediately get the exception.
Perhaps that just depends on if the underlying operation or library being used will raise the CancelledError right away or not?
The library doesn't raise CancelledError
, it receives it at the await point where it happened to suspend before cancellation occurred. For the library the effect of the cancellation is await ...
interrupting its wait and immediately raising CancelledError
. Unless caught, the exception will propagate through function and await
calls all the way to the top-level coroutine, whose raising CancelledError
marks the whole task as cancelled. Well-behaved asyncio code will do just that, possibly using finally
to release OS-level resources they hold. When CancelledError
is caught, the code can choose not to re-raise it, in which case cancellation is effectively ignored.
Is it possible loop.run_until_complete (or really, the underlying call to
async.wait
) returns values in unfinished for a reason other than a timeout?
If you're using return_when=asyncio.ALL_COMPLETE
(the default), that shouldn't be possible. It is quite possible with return_when=FIRST_COMPLETED
, then it is obviously possible independently of timeout.