Asynchronous exception handling in Python
I've the following code using asyncio
and aiohttp
to make asynchronous HTTP requests.
import sys
import asyncio
import aiohttp
@asyncio.coroutine
def get(url):
try:
print('GET %s' % url)
resp = yield from aiohttp.request('GET', url)
except Exception as e:
raise Exception("%s has error '%s'" % (url, e))
else:
if resp.status >= 400:
raise Exception("%s has error '%s: %s'" % (url, resp.status, resp.reason))
return (yield from resp.text())
@asyncio.coroutine
def fill_data(run):
url = 'http://www.google.com/%s' % run['name']
run['data'] = yield from get(url)
def get_runs():
runs = [ {'name': 'one'}, {'name': 'two'} ]
loop = asyncio.get_event_loop()
task = asyncio.wait([fill_data(r) for r in runs])
loop.run_until_complete(task)
return runs
try:
get_runs()
except Exception as e:
print(repr(e))
sys.exit(1)
For some reason, exceptions raised inside the get
function are not caught:
Future/Task exception was never retrieved
Traceback (most recent call last):
File "site-packages/asyncio/tasks.py", line 236, in _step
result = coro.send(value)
File "mwe.py", line 25, in fill_data
run['data'] = yield from get(url)
File "mwe.py", line 17, in get
raise Exception("%s has error '%s: %s'" % (url, resp.status, resp.reason))
Exception: http://www.google.com/two has error '404: Not Found'
So, what is correct way to handle exceptions raised by coroutines?
Solution 1:
asyncio.wait
doesn't actually consume the Futures
passed to it, it just waits for them to complete, and then returns the Future
objects:
coroutine
asyncio.wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
Wait for the Futures and coroutine objects given by the sequence futures to complete. Coroutines will be wrapped in Tasks. Returns two sets of
Future
: (done, pending).
Until you actually yield from
the items in the done
list, they'll remain unconsumed. Since your program exits without consuming the futures, you see the "exception was never retrieved" messages.
For your use-case, it probably makes more sense to use asyncio.gather
, which will actually consume each Future
, and then return a single Future
that aggregates all their results (or raises the first Exception
thrown by a future in the input list).
def get_runs():
runs = [ {'name': 'one'}, {'name': 'two'} ]
loop = asyncio.get_event_loop()
tasks = asyncio.gather(*[fill_data(r) for r in runs])
loop.run_until_complete(tasks)
return runs
Output:
GET http://www.google.com/two
GET http://www.google.com/one
Exception("http://www.google.com/one has error '404: Not Found'",)
Note that asyncio.gather
actually lets you customize its behavior when one of the futures raises an exception; the default behavior is to raise the first exception it hits, but it can also just return each exception object in the output list:
asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)
Return a future aggregating results from the given coroutine objects or futures.
All futures must share the same event loop. If all the tasks are done successfully, the returned future’s result is the list of results (in the order of the original sequence, not necessarily the order of results arrival). If
return_exceptions
isTrue
, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future.
Solution 2:
To debug or "handle" exceptions in callback:
Coroutine which return some result or raise exceptions:
@asyncio.coroutine
def async_something_entry_point(self):
try:
return self.real_stuff_which_throw_exceptions()
except:
raise Exception(some_identifier_here + ' ' + traceback.format_exc())
And callback:
def callback(self, future: asyncio.Future):
exc = future.exception()
if exc:
# Handle wonderful empty TimeoutError exception
if type(exc) == TimeoutError:
self.logger('<Some id here> callback exception TimeoutError')
else:
self.logger("<Some id here> callback exception " + str(exc))
# store your result where you want
self.result.append(
future.result()
)