Python Multiprocessing: Handling Child Errors in Parent
I am currently playing around with multiprocessing and queues. I have written a piece of code to export data from mongoDB, map it into a relational (flat) structure, convert all values to string and insert them into mysql.
Each of these steps is submitted as a process and given import/export queues, safe for the mongoDB export which is handled in the parent.
As you will see below, I use queues and child processes terminate themselves when they read "None" from the queue. The problem I currently have is that, if a child process runs into an unhandled Exception, this is not recognized by the parent and the rest just Keeps running. What I want to happen is that the whole shebang quits and at best reraise the child error.
I have two questions:
- How do I detect the child error in the parent?
- How do I kill my child processes after detecting the error (best practice)? I realize that putting "None" to the queue to kill the child is pretty dirty.
I am using python 2.7.
Here are the essential parts of my code:
# Establish communication queues
mongo_input_result_q = multiprocessing.Queue()
mapper_result_q = multiprocessing.Queue()
converter_result_q = multiprocessing.Queue()
[...]
# create child processes
# all processes generated here are subclasses of "multiprocessing.Process"
# create mapper
mappers = [mongo_relational_mapper.MongoRelationalMapper(mongo_input_result_q, mapper_result_q, columns, 1000)
for i in range(10)]
# create datatype converter, converts everything to str
converters = [datatype_converter.DatatypeConverter(mapper_result_q, converter_result_q, 'str', 1000)
for i in range(10)]
# create mysql writer
# I create a list of writers. currently only one,
# but I have the option to parallellize it further
writers = [mysql_inserter.MySqlWriter(mysql_host, mysql_user, mysql_passwd, mysql_schema, converter_result_q
, columns, 'w_'+mysql_table, 1000) for i in range(1)]
# starting mapper
for mapper in mappers:
mapper.start()
time.sleep(1)
# starting converter
for converter in converters:
converter.start()
# starting writer
for writer in writers:
writer.start()
[... initializing mongo db connection ...]
# put each dataset read to queue for the mapper
for row in mongo_collection.find({inc_column: {"$gte": start}}):
mongo_input_result_q.put(row)
count += 1
if count % log_counter == 0:
print 'Mongo Reader' + " " + str(count)
print "MongoReader done"
# Processes are terminated when they read "None" object from queue
# now that reading is finished, put None for each mapper in the queue so they terminate themselves
# the same for all followup processes
for mapper in mappers:
mongo_input_result_q.put(None)
for mapper in mappers:
mapper.join()
for converter in converters:
mapper_result_q.put(None)
for converter in converters:
converter.join()
for writer in writers:
converter_result_q.put(None)
for writer in writers:
writer.join()
Solution 1:
Why not to let the Process to take care of its own exceptions, like this:
from __future__ import print_function
import multiprocessing as mp
import traceback
class Process(mp.Process):
def __init__(self, *args, **kwargs):
mp.Process.__init__(self, *args, **kwargs)
self._pconn, self._cconn = mp.Pipe()
self._exception = None
def run(self):
try:
mp.Process.run(self)
self._cconn.send(None)
except Exception as e:
tb = traceback.format_exc()
self._cconn.send((e, tb))
# raise e # You can still rise this exception if you need to
@property
def exception(self):
if self._pconn.poll():
self._exception = self._pconn.recv()
return self._exception
Now you have, both error and traceback at your hands:
def target():
raise ValueError('Something went wrong...')
p = Process(target = target)
p.start()
p.join()
if p.exception:
error, traceback = p.exception
print(traceback)
Regards, Marek
Solution 2:
I don't know standard practice but what I've found is that to have reliable multiprocessing I design the methods/class/etc. specifically to work with multiprocessing. Otherwise you never really know what's going on on the other side (unless I've missed some mechanism for this).
Specifically what I do is:
- Subclass
multiprocessing.Process
or make functions that specifically support multiprocessing (wrapping functions that you don't have control over if necessary) - always provide a shared error
multiprocessing.Queue
from the main process to each worker process - enclose the entire run code in a
try: ... except Exception as e
. Then when something unexpected happens send an error package with:- the process id that died
- the exception with it's original context (check here). The original context is really important if you want to log useful information in the main process.
- of course handle expected issues as normal within the normal operation of the worker
- (similar to what you said already) assuming a long-running process, wrap the running code (inside the try/catch-all) with a loop
- define a stop token in the class or for functions.
- When the main process wants the worker(s) to stop, just send the stop token. to stop everyone, send enough for all the processes.
- the wrapping loop checks the input q for the token or whatever other input you want
The end result is worker processes that can survive for a long time and that can let you know what's happening when something goes wrong. They will die quietly since you can handle whatever you need to do after the catch-all exception and you will also know when you need to restart a worker.
Again, I've just come to this pattern through trial and error so I don't know how standard it is. Does that help with what you are asking for?
Solution 3:
@mrkwjc 's solution is simple, so easy to understand and implement, but there is one disadvantage of this solution. When we have few processes and we want to stop all processes if any single process has error, we need to wait until all processes are finished in order to check if p.exception
. Below is the code which fixes this problem (ie when one child has error, we terminate also another child):
import multiprocessing
import traceback
from time import sleep
class Process(multiprocessing.Process):
"""
Class which returns child Exceptions to Parent.
https://stackoverflow.com/a/33599967/4992248
"""
def __init__(self, *args, **kwargs):
multiprocessing.Process.__init__(self, *args, **kwargs)
self._parent_conn, self._child_conn = multiprocessing.Pipe()
self._exception = None
def run(self):
try:
multiprocessing.Process.run(self)
self._child_conn.send(None)
except Exception as e:
tb = traceback.format_exc()
self._child_conn.send((e, tb))
# raise e # You can still rise this exception if you need to
@property
def exception(self):
if self._parent_conn.poll():
self._exception = self._parent_conn.recv()
return self._exception
class Task_1:
def do_something(self, queue):
queue.put(dict(users=2))
class Task_2:
def do_something(self, queue):
queue.put(dict(users=5))
def main():
try:
task_1 = Task_1()
task_2 = Task_2()
# Example of multiprocessing which is used:
# https://eli.thegreenplace.net/2012/01/16/python-parallelizing-cpu-bound-tasks-with-multiprocessing/
task_1_queue = multiprocessing.Queue()
task_2_queue = multiprocessing.Queue()
task_1_process = Process(
target=task_1.do_something,
kwargs=dict(queue=task_1_queue))
task_2_process = Process(
target=task_2.do_something,
kwargs=dict(queue=task_2_queue))
task_1_process.start()
task_2_process.start()
while task_1_process.is_alive() or task_2_process.is_alive():
sleep(10)
if task_1_process.exception:
error, task_1_traceback = task_1_process.exception
# Do not wait until task_2 is finished
task_2_process.terminate()
raise ChildProcessError(task_1_traceback)
if task_2_process.exception:
error, task_2_traceback = task_2_process.exception
# Do not wait until task_1 is finished
task_1_process.terminate()
raise ChildProcessError(task_2_traceback)
task_1_process.join()
task_2_process.join()
task_1_results = task_1_queue.get()
task_2_results = task_2_queue.get()
task_1_users = task_1_results['users']
task_2_users = task_2_results['users']
except Exception:
# Here usually I send email notification with error.
print('traceback:', traceback.format_exc())
if __name__ == "__main__":
main()
Solution 4:
Thanks to kobejohn i have found a solution which is nice and stable.
-
I have created a subclass of multiprocessing.Process which implements some functions and overwrites the
run()
method to wrap a new saferun method into a try-catch block. This Class requires a feedback_queue to initialize which is used to report info, debug, error messages back to the parent. The log methods in the class are wrappers for the globally defined log functions of the package:class EtlStepProcess(multiprocessing.Process): def __init__(self, feedback_queue): multiprocessing.Process.__init__(self) self.feedback_queue = feedback_queue def log_info(self, message): log_info(self.feedback_queue, message, self.name) def log_debug(self, message): log_debug(self.feedback_queue, message, self.name) def log_error(self, err): log_error(self.feedback_queue, err, self.name) def saferun(self): """Method to be run in sub-process; can be overridden in sub-class""" if self._target: self._target(*self._args, **self._kwargs) def run(self): try: self.saferun() except Exception as e: self.log_error(e) raise e return
-
I have subclassed all my other process steps from EtlStepProcess. The code to be run is implemented in the saferun() method rather than run. This ways i do not have to add a try catch block around it, since this is already done by the run() method. Example:
class MySqlWriter(EtlStepProcess): def __init__(self, mysql_host, mysql_user, mysql_passwd, mysql_schema, mysql_table, columns, commit_count, input_queue, feedback_queue): EtlStepProcess.__init__(self, feedback_queue) self.mysql_host = mysql_host self.mysql_user = mysql_user self.mysql_passwd = mysql_passwd self.mysql_schema = mysql_schema self.mysql_table = mysql_table self.columns = columns self.commit_count = commit_count self.input_queue = input_queue def saferun(self): self.log_info(self.name + " started") #create mysql connection engine = sqlalchemy.create_engine('mysql://' + self.mysql_user + ':' + self.mysql_passwd + '@' + self.mysql_host + '/' + self.mysql_schema) meta = sqlalchemy.MetaData() table = sqlalchemy.Table(self.mysql_table, meta, autoload=True, autoload_with=engine) connection = engine.connect() try: self.log_info("start MySQL insert") counter = 0 row_list = [] while True: next_row = self.input_queue.get() if isinstance(next_row, Terminator): if counter % self.commit_count != 0: connection.execute(table.insert(), row_list) # Poison pill means we should exit break row_list.append(next_row) counter += 1 if counter % self.commit_count == 0: connection.execute(table.insert(), row_list) del row_list[:] self.log_debug(self.name + ' ' + str(counter)) finally: connection.close() return
-
In my main file, I submit a Process that does all the work and give it a feedback_queue. This process starts all the steps and thenreads from mongoDB and puts values to the initial queue. My main process listens to the feedback queue and prints all log messages. If it receives an error log, it print the error and terminate its child, which in return also terminates all its children before dying.
if __name__ == '__main__': feedback_q = multiprocessing.Queue() p = multiprocessing.Process(target=mongo_python_export, args=(feedback_q,)) p.start() while p.is_alive(): fb = feedback_q.get() if fb["type"] == "error": p.terminate() print "ERROR in " + fb["process"] + "\n" for child in multiprocessing.active_children(): child.terminate() else: print datetime.datetime.fromtimestamp(fb["timestamp"]).strftime('%Y-%m-%d %H:%M:%S') + " " + \ fb["process"] + ": " + fb["message"] p.join()
I think about making a module out of it and putting it up on github, but I have to do some cleaning up and commenting first.