How can I wrap a synchronous function in an async coroutine?

I'm using aiohttp to build an API server that sends TCP requests off to a seperate server. The module that sends the TCP requests is synchronous and a black box for my purposes. So my problem is that these requests are blocking the entire API. I need a way to wrap the module requests in an asynchronous coroutine that won't block the rest of the API.

So, just using sleep as a simple example, is there any way to somehow wrap time-consuming synchronous code in a non-blocking coroutine, something like this:

async def sleep_async(delay):
    # After calling sleep, loop should be released until sleep is done
    yield sleep(delay)
    return 'I slept asynchronously'

Solution 1:

Eventually I found an answer in this thread. The method I was looking for is run_in_executor. This allows a synchronous function to be run asynchronously without blocking an event loop.

In the sleep example I posted above, it might look like this:

import asyncio
from time import sleep

async def sleep_async(loop, delay):
    # None uses the default executor (ThreadPoolExecutor)
    await loop.run_in_executor(None, sleep, delay)
    return 'I slept asynchronously'

Also see the following answer -> How do we call a normal function where a coroutine is expected?

Solution 2:

You can use a decorator to wrap the sync version to an async version.

import time
from functools import wraps, partial


def wrap(func):
    @wraps(func)
    async def run(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        pfunc = partial(func, *args, **kwargs)
        return await loop.run_in_executor(executor, pfunc)
    return run

@wrap
def sleep_async(delay):
    time.sleep(delay)
    return 'I slept asynchronously'

or use the aioify library

% pip install aioify

then

@aioify
def sleep_async(delay):
    pass

Solution 3:

The decorator would be useful for this case and run your blocking function in another thread.

import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import wraps, partial
from typing import Union

class to_async:

    def __init__(self, *, executor: Optional[ThreadPoolExecutor]=None):
       
        self.executor =  executor
    
    def __call__(self, blocking):
        @wraps(blocking)
        async def wrapper(*args, **kwargs):

            loop = asyncio.get_event_loop()
            if not self.executor:
                self.executor = ThreadPoolExecutor()

            func = partial(blocking, *args, **kwargs)
        
            return await loop.run_in_executor(self.executor,func)

        return wrapper

@to_async(executor=None)
def sync(*args, **kwargs):
    print(args, kwargs)
   
asyncio.run(sync("hello", "world", result=True))