Make an CPU-bound task asynchronous for FastAPI WebSockets

so I have a CPU-bound long-running algorithm, let's call it task. Let's say it looks like this:

def task(parameters):
  result = 0
  for _ in range(10):
    for _ in range(10):
      for _ in range(10):
        result += do_things()
  return result

@app.get('/')
def results(parameters: BodyModel):
    return task(parameters)

If I encapsulate that in a def path operation function everything works fine as it is started in a different thread. I can access multiple paths etc. concurrency is doing its job by pushing my CPU-bound task to a separate thread. But I want to switch to WebSockets now, to communicate intermediate results. For that to work, I have to mark my whole thing as asynchronous and pass the WebSocket into my task. So it looks like this:

async def task(parameters):
  result = 0
  for _ in range(10):
    for _ in range(10):
      for _ in range(10):
        intermediate_result = do_things()
        await parameters.websocket.send_text(intermediate_result)
        result += intermediate_result
  return result

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    while True:
        parameters = await websocket.receive_text()
        parameters.websocket = websocket
        result = await task(parameters)
        await websocket.send_text(result)

It works like a charm to send the intermediate results. BUT now my algorithm blocks FastAPI as it is not truly asynchronous by itself. Once I post a message to '/ws' FastAPI is blocked and does not respond to any other requests until my task is finished.

So I need some advice on how to

  • a) either send WebSocket messages from within a synchronous CPU-bound task (I didn't find a synchronous send_text alternative) so I can use def or
  • b) how to make my CPU-bound truly asynchronous so that it does not block anything anymore when I use async def.

I tried using the ProcessPoolExecuter as described here but it's not possible to pickle a coroutine and as far as I understand I have to make my task a coroutine (using async) to use the websocket.send_text() within it.

Also, I thought about just storing my intermediate results somewhere, make an HTTP POST to start my task, and then have another WebSocket connection to read and send the intermediate results. But then I could also similarly start a background task and implement a regular HTTP polling mechanism. But I don't want either, mainly because I plan to use Google Cloud Run which limits the CPU when all connections are closed. And I think it's better practice to teach my task how to communicate via WebSocket directly.

I hope my question is clear. It's my first larger-scale project with FastAPI and asynchronicity and haven't really used AsyncIO before. So I might have just missed something. Thx for your suggestions.


Solution 1:

In case someone comes across this, I'll add the solution that works for me now.

I was following this: https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor

The key is to make it non-blocking. So for example, instead of:

 # 1. Run in the default loop's executor:
result = await loop.run_in_executor(None, blocking_io)
print('default thread pool', result)

I move the await and change the code to:

# 1. Run in the default loop's executor:
thread = loop.run_in_executor(None, blocking_io)
print('default thread pool', result)
while True:
    asyncio.sleep(1)
    websocket.send_text('status updates...'
    if internal_logger.blocking_stuff_finished:
        break
result = await thread
websocket.send_text('result:', result)
websocket.close()

This way I have my cpu_bound stuff in a separate thread that I'm not awaiting for and everything works fine.

Making a custom thread pool also works, but we would need to remove the context manager to make it non-blocking.

# 2. Run in a custom thread pool:
with concurrent.futures.ThreadPoolExecutor() as pool:
    result = await loop.run_in_executor(pool, blocking_io)

would then become:

pool = concurrent.futures.ThreadPoolExecutor()
thread = loop.run_in_executor(pool, blocking_io)

In theory, the same would work for the ProcessPoolExecutor, but more work needs to be done as there is no shared memory and my termination condition wouldn't work as described above.

And yes, I know that cpu_bound stuff should preferably be done in a different process, but moving it to a separate thread does not work in my case and I do enjoy the shared memory atm.