Python multiprocessing queue get() timeout despite full queue
I am using Python's multiprocessing module to do scientific parallel processing. In my code I use several working processes which does the heavy lifting and a writer process which persists the results to disk. The data to be written is send from the worker processes to the writer process via a Queue. The data itself is rather simple and solely consists of a tuple holding a filename and a list with two floats. After several hours of processing the writer process often would get stuck. More precisely the following block of code
while (True):
try:
item = queue.get(timeout=60)
break
except Exception as error:
logging.info("Writer: Timeout occurred {}".format(str(error)))
will never exit the loop and I get continuous 'Timeout' messages.
I also implemented a logging process which outputs, among others, the status of the queue and, even though I get the timeout error message above, a call to qsize() constantly returns a full queue (size=48 in my case).
I have thoroughly checked the documentation on the queue object and can find no possible explanation for why the get() returns timeouts while the queue is full at the same time.
Any ideas?
Edit:
I modified the code to make sure I catch an empty queue exception:
while (True):
try:
item = queue.get(timeout=60)
break
except Empty as error:
logging.info("Writer: Timeout occurred {}".format(str(error)))
In multiprocessing queue is used as synchronized message queue. This also seems to be the case in your problem. This however requires more than just call to get()
method. After every task is processed you need to call task_done()
so that element get removed from queue.
From documentation:
Queue.task_done()
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
In documentation you will also find code example of proper threading queue usage.
In case of your code it should be like this
while (True):
try:
item = queue.get(timeout=60)
if item is None:
break
# call working fuction here
queue.task_done()
except Exception as error:
logging.info("Writer: Timeout occurred {}".format(str(error)))
Switching to manager based queue should help solve this issue.
manager = Manager()
queue = manager.Queue()
For more details you can check multiprocessing documentation here: https://docs.python.org/2/library/multiprocessing.html