How to properly make a queue? Why does sleep on consumer make the queue work?
Solution 1:
The code does not implement a queue from scratch, but extends queue.Queue
to add memory. There is an event object that is used to signal to the consumers that the producer thread has finished. There is are hidden race conditions in the consumers when there is only one item on the queue.
The check not q.empty() or not evnt.is_set()
will run the loop code either if there is something in the queue or the event has not been set. It could happen that:
- One thread sees that the queue is not empty and enters the loop
- A thread switch happens, and the other thread consumes the last item
- A switch happens to the first thread, which calls
get_number()
and blocks
A similar race condition happens with the evnt.is_set()
check:
- The last item is added to the queue by the producer, and a thread switch happens
- One thread consumes the last item, a switch
- A thread switch happens, the consumer gets the last item and goes back to the loop condition. As the event has not been set the loop is executed and
get_number()
blocks
Having the threads wait minimizes the chance of these conditions happening. Without waiting, it is very likely that a single consumer thread will consume all the queue items, while the other one is still entering its loop.
Using timeouts is cumbersome. A useful idiom that avoids using events is to use iter
and use an impossible value as a sentinel:
# --- snip ---
def produce_random_numbers(q: MyQueue, maxcount: int, n_consumers: int):
for _ in range(maxcount):
num = random.randint(1, 5)
q.set_number(num)
for _ in range(n_consumers):
q.put(None) # <--- I use put to put one sentinel per consumer
def consume_numbers(q: MyQueue, consumed: list):
for num in iter(q.get_number, None):
consumed.append(num)
if __name__ == "__main__":
q = MyQueue(maxsize=10)
cons1 = []
cons2 = []
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as ex:
ex.submit(produce_random_numbers, q, 500000, 2)
ex.submit(consume_numbers, q, cons1)
ex.submit(consume_numbers, q, cons2)
print(f'Generated Numbers: {q.numbers}')
# --- snip ---
There are some other issues and things I would have done differently:
- The
event.set()
after thewith...
block is useless: the event has already been set by the producer - There is a typo in the producer and the global
event
variable is used instead of the localevnt
parameter. Fortunately those refer to the same object. - As there is only one producer, there will be no problem. Otherwise the order of MyQueue.numbers could be different from the order in which the items were added to the queue:
-
put
is called on one thread - a thread switch happens
- a
put
+append
happens in the new thread - a thread switch happens, and the first value is
append
ed
-
- Instead of defining
MyQueue.set_number
I would have overridedput