Using asyncio.Queue for producer-consumer flow

How can I modify the program above so that the producer(s) is its own coroutine that can be scheduled concurrently with the consumers/workers?

The example can be generalized without changing its essential logic:

  • Move the insertion loop to a separate producer coroutine.
  • Start the consumers in the background, letting them process the items as they are produced.
  • With the consumers running, start the producers and wait for them to finish producing items, as with await producer() or await gather(*producers), etc.
  • Once all producers are done, wait for consumers to process the remaining items with await queue.join().
  • Cancel the consumers, all of which are now idly waiting for the queue to deliver the next item, which will never arrive as we know the producers are done.

Here is an example implementing the above:

import asyncio, random
 
async def rnd_sleep(t):
    # sleep for T seconds on average
    await asyncio.sleep(t * random.random() * 2)
 
async def producer(queue):
    while True:
        # produce a token and send it to a consumer
        token = random.random()
        print(f'produced {token}')
        if token < .05:
            break
        await queue.put(token)
        await rnd_sleep(.1)
 
async def consumer(queue):
    while True:
        token = await queue.get()
        # process the token received from a producer
        await rnd_sleep(.3)
        queue.task_done()
        print(f'consumed {token}')
 
async def main():
    queue = asyncio.Queue()
 
    # fire up the both producers and consumers
    producers = [asyncio.create_task(producer(queue))
                 for _ in range(3)]
    consumers = [asyncio.create_task(consumer(queue))
                 for _ in range(10)]
 
    # with both producers and consumers running, wait for
    # the producers to finish
    await asyncio.gather(*producers)
    print('---- done producing')
 
    # wait for the remaining tasks to be processed
    await queue.join()
 
    # cancel the consumers, which are now idle
    for c in consumers:
        c.cancel()
 
asyncio.run(main())

Note that in real-life producers and consumers, especially those that involve network access, you probably want to catch IO-related exceptions that occur during processing. If the exception is recoverable, as most network-related exceptions are, you can simply catch the exception and log the error. You should still invoke task_done() because otherwise queue.join() will hang due to an unprocessed item. If it makes sense to re-try processing the item, you can return it into the queue prior to calling task_done(). For example:

# like the above, but handling exceptions during processing:
async def consumer(queue):
    while True:
        token = await queue.get()
        try:
            # this uses aiohttp or whatever
            await process(token)
        except aiohttp.ClientError as e:
            print(f"Error processing token {token}: {e}")
            # If it makes sense, return the token to the queue to be
            # processed again. (You can use a counter to avoid
            # processing a faulty token infinitely.)
            #await queue.put(token)
        queue.task_done()
        print(f'consumed {token}')