python Pool with worker Processes

I am trying to use a worker Pool in python using Process objects. Each worker (a Process) does some initialization (takes a non-trivial amount of time), gets passed a series of jobs (ideally using map()), and returns something. No communication is necessary beyond that. However, I can't seem to figure out how to use map() to use my worker's compute() function.

from multiprocessing import Pool, Process

class Worker(Process):
    def __init__(self):
        print 'Worker started'
        # do some initialization here
        super(Worker, self).__init__()

    def compute(self, data):
        print 'Computing things!'
        return data * data

if __name__ == '__main__':
    # This works fine
    worker = Worker()
    print worker.compute(3)

    # workers get initialized fine
    pool = Pool(processes = 4,
                initializer = Worker)
    data = range(10)
    # How to use my worker pool?
    result = pool.map(compute, data)

Is a job queue the way to go instead, or can I use map()?


I would suggest that you use a Queue for this.

class Worker(Process):
    def __init__(self, queue):
        super(Worker, self).__init__()
        self.queue = queue

    def run(self):
        print('Worker started')
        # do some initialization here

        print('Computing things!')
        for data in iter(self.queue.get, None):
            # Use data

Now you can start a pile of these, all getting work from a single queue

request_queue = Queue()
for i in range(4):
    Worker(request_queue).start()
for data in the_real_source:
    request_queue.put(data)
# Sentinel objects to allow clean shutdown: 1 per worker.
for i in range(4):
    request_queue.put(None) 

That kind of thing should allow you to amortize the expensive startup cost across multiple workers.


initializer expects an arbitrary callable that does initilization e.g., it can set some globals, not a Process subclass; map accepts an arbitrary iterable:

#!/usr/bin/env python
import multiprocessing as mp

def init(val):
    print('do some initialization here')

def compute(data):
    print('Computing things!')
    return data * data

def produce_data():
    yield -100
    for i in range(10):
        yield i
    yield 100

if __name__=="__main__":
  p = mp.Pool(initializer=init, initargs=('arg',))
  print(p.map(compute, produce_data()))