How to properly make a queue? Why does sleep on consumer make the queue work?

Solution 1:

The code does not implement a queue from scratch, but extends queue.Queue to add memory. There is an event object that is used to signal to the consumers that the producer thread has finished. There is are hidden race conditions in the consumers when there is only one item on the queue.

The check not q.empty() or not evnt.is_set() will run the loop code either if there is something in the queue or the event has not been set. It could happen that:

  1. One thread sees that the queue is not empty and enters the loop
  2. A thread switch happens, and the other thread consumes the last item
  3. A switch happens to the first thread, which calls get_number() and blocks

A similar race condition happens with the evnt.is_set() check:

  1. The last item is added to the queue by the producer, and a thread switch happens
  2. One thread consumes the last item, a switch
  3. A thread switch happens, the consumer gets the last item and goes back to the loop condition. As the event has not been set the loop is executed and get_number() blocks

Having the threads wait minimizes the chance of these conditions happening. Without waiting, it is very likely that a single consumer thread will consume all the queue items, while the other one is still entering its loop.

Using timeouts is cumbersome. A useful idiom that avoids using events is to use iter and use an impossible value as a sentinel:

# --- snip ---
def produce_random_numbers(q: MyQueue, maxcount: int, n_consumers: int):
    for _ in range(maxcount):
        num = random.randint(1, 5)
        q.set_number(num)
    for _ in range(n_consumers):
        q.put(None)  # <--- I use put to put one sentinel per consumer


def consume_numbers(q: MyQueue, consumed: list):
    for num in iter(q.get_number, None):
        consumed.append(num)


if __name__ == "__main__":
    q = MyQueue(maxsize=10)
    cons1 = []
    cons2 = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as ex:
        ex.submit(produce_random_numbers, q, 500000, 2)
        ex.submit(consume_numbers, q, cons1)
        ex.submit(consume_numbers, q, cons2)
    print(f'Generated Numbers: {q.numbers}')
# --- snip ---

There are some other issues and things I would have done differently:

  • The event.set() after the with... block is useless: the event has already been set by the producer
  • There is a typo in the producer and the global event variable is used instead of the local evnt parameter. Fortunately those refer to the same object.
  • As there is only one producer, there will be no problem. Otherwise the order of MyQueue.numbers could be different from the order in which the items were added to the queue:
    1. put is called on one thread
    2. a thread switch happens
    3. a put + append happens in the new thread
    4. a thread switch happens, and the first value is appended
  • Instead of defining MyQueue.set_number I would have overrided put