how to use initializer to set up my multiprocess pool?

I'm trying to use the multiprocess Pool object. I'd like each process to open a database connection when it starts, then use that connection to process the data that is passed in. (Rather than opening and closing the connection for each bit of data.) This seems like what the initializer is for, but I can't wrap my head around how the worker and the initializer communicate. So I have something like this:

def get_cursor():
  return psycopg2.connect(...).cursor()

def process_data(data):
   # here I'd like to have the cursor so that I can do things with the data

if __name__ == "__main__":
  pool = Pool(initializer=get_cursor, initargs=())
  pool.map(process_data, get_some_data_iterator())

how do I (or do I) get the cursor back from get_cursor() into the process_data()?


Solution 1:

The initialize function is called thus:

def worker(...):
    ...
    if initializer is not None:
        initializer(*args)

so there is no return value saved anywhere. You might think this dooms you, but no! Each worker is in a separate process. Thus, you can use an ordinary global variable.

This is not exactly pretty, but it works:

cursor = None
def set_global_cursor(...):
    global cursor
    cursor = ...

Now you can just use cursor in your process_data function. The cursor variable inside each separate process is separate from all the other processes, so they do not step on each other.

(I have no idea whether psycopg2 has a different way to deal with this that does not involve using multiprocessing in the first place; this is meant as a general answer to a general problem with the multiprocessing module.)

Solution 2:

You can also send the function along to the initializer and create a connection in it. Afterwards you add the cursor to the function.

def init_worker(function):
    function.cursor = db.conn()

Now you can access the db through function.cursor without using globals, for example:

def use_db(i):
    print(use_db.cursor) #process local
pool = Pool(initializer=init_worker, initargs=(use_db,))
pool.map(use_db, range(10))