When using asyncio, how do you allow all running tasks to finish before shutting down the event loop
I have the following code:
@asyncio.coroutine
def do_something_periodically():
while True:
asyncio.async(my_expensive_operation())
yield from asyncio.sleep(my_interval)
if shutdown_flag_is_set:
print("Shutting down")
break
I run this function until complete. The problem occurs when shutdown is set - the function completes and any pending tasks are never run.
This is the error:
task: <Task pending coro=<report() running at script.py:33> wait_for=<Future pending cb=[Task._wakeup()]>>
How do I schedule a shutdown correctly?
To give some context, I'm writing a system monitor which reads from /proc/stat every 5 seconds, computes the cpu usage in that period, and then sends the result to a server. I want to keep scheduling these monitoring jobs until I receive sigterm, when I stop scheduling, wait for all current jobs to finish, and exit gracefully.
You can retrieve unfinished tasks and run the loop again until they finished, then close the loop or exit your program.
pending = asyncio.all_tasks()
loop.run_until_complete(asyncio.gather(*pending))
-
pending
is a list of pending tasks. -
asyncio.gather()
allows to wait on several tasks at once.
If you want to ensure all the tasks are completed inside a coroutine (maybe you have a "main" coroutine), you can do it this way, for instance:
async def do_something_periodically():
while True:
asyncio.create_task(my_expensive_operation())
await asyncio.sleep(my_interval)
if shutdown_flag_is_set:
print("Shutting down")
break
await asyncio.gather(*asyncio.all_tasks())
Also, in this case, since all the tasks are created in the same coroutine, you already have access to the tasks:
async def do_something_periodically():
tasks = []
while True:
tasks.append(asyncio.create_task(my_expensive_operation()))
await asyncio.sleep(my_interval)
if shutdown_flag_is_set:
print("Shutting down")
break
await asyncio.gather(*tasks)
As of Python 3.7 the above answer uses multiple deprecated APIs (asyncio.async and Task.all_tasks,@asyncio.coroutine, yield from, etc.) and you should rather use this:
import asyncio
async def my_expensive_operation(expense):
print(await asyncio.sleep(expense, result="Expensive operation finished."))
async def do_something_periodically(expense, interval):
while True:
asyncio.create_task(my_expensive_operation(expense))
await asyncio.sleep(interval)
loop = asyncio.get_event_loop()
coro = do_something_periodically(1, 1)
try:
loop.run_until_complete(coro)
except KeyboardInterrupt:
coro.close()
tasks = asyncio.all_tasks(loop)
expensive_tasks = {task for task in tasks if task._coro.__name__ != coro.__name__}
loop.run_until_complete(asyncio.gather(*expensive_tasks))
I noticed some answers suggested using asyncio.gather(*asyncio.all_tasks())
, but the issue with that can sometimes be an infinite loop where it waits for the asyncio.current_task()
to complete, which is itself. Some answers suggested some complicated workarounds involving checking coro
names or len(asyncio.all_tasks())
, but it turns out it's very simple to do by taking advantage of set
operations:
async def main():
# Create some tasks.
for _ in range(10):
asyncio.create_task(asyncio.sleep(10))
# Wait for all other tasks to finish other than the current task i.e. main().
await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()})
Use a wrapper coroutine that waits until the pending task count is 1 before returning.
async def loop_job():
asyncio.create_task(do_something_periodically())
while len(asyncio.Task.all_tasks()) > 1: # Any task besides loop_job() itself?
await asyncio.sleep(0.2)
asyncio.run(loop_job())