asyncio automatic cancellation of subtasks

Solution 1:

Managed to find an answer to my own question. Task cancellation can be achieved via structured concurrency which in the current version of Python (Python 3.10) is not supported, though there has been a proposal to introduce TaskGroups following PEP 654.

Fortunately there is AnyIO library which implements trio-like structured concurrency on top of asyncio. The example in my question can be rewritten in AnyIO to have cancellable tasks:

import asyncio
from anyio import create_memory_object_stream, TASK_STATUS_IGNORED, create_task_group
from contextlib import AsyncExitStack

async def coroutine1(send_stream):
    async with send_stream:
        await send_stream.send(1)

async def coroutine2(send_stream):
    async with send_stream:
        await asyncio.sleep(1)
        await send_stream.send(2)

async def process_result(receive_stream, send_stream):
    async with AsyncExitStack() as stack:
        rs = await stack.enter_async_context(receive_stream)
        ss = await stack.enter_async_context(send_stream)
        res_rs = await rs.receive()
        raise Exception
        await ss.send(res_rs + 1)

async def process_results(receive_stream_2, receive_stream_3, *, task_status=TASK_STATUS_IGNORED):
    task_status.started()
    async with AsyncExitStack() as stack:
        rs_2 = await stack.enter_async_context(receive_stream_2)
        rs_3 = await stack.enter_async_context(receive_stream_3)
        res_rs_2 = await rs_2.receive()
        res_rs_3 = await rs_3.receive()
        return res_rs_2 + res_rs_3
    

async def f():
    async with create_task_group() as tg:
        send_stream_1, receive_stream_1 = create_memory_object_stream(1)
        tg.start_soon(coroutine1, send_stream_1)
        send_stream_2, receive_stream_2 = create_memory_object_stream(1)
        tg.start_soon(coroutine2, send_stream_2)
        send_stream_3, receive_stream_3 = create_memory_object_stream(1)
        tg.start_soon(process_result, receive_stream_1, send_stream_3)
        # process_result will raise an Exception which will cancel all tasks in tg group
        result = await process_results(receive_stream_2, receive_stream_3)
        print(result)

asyncio.run(f())