How to Multi-thread an Operation Within a Loop in Python

Say I have a very large list and I'm performing an operation like so:

for item in items:
    try:
        api.my_operation(item)
    except:
        print 'error with item'

My issue is two fold:

  • There are a lot of items
  • api.my_operation takes forever to return

I'd like to use multi-threading to spin up a bunch of api.my_operations at once so I can process maybe 5 or 10 or even 100 items at once.

If my_operation() returns an exception (because maybe I already processed that item) - that's OK. It won't break anything. The loop can continue to the next item.

Note: this is for Python 2.7.3


First, in Python, if your code is CPU-bound, multithreading won't help, because only one thread can hold the Global Interpreter Lock, and therefore run Python code, at a time. So, you need to use processes, not threads.

This is not true if your operation "takes forever to return" because it's IO-bound—that is, waiting on the network or disk copies or the like. I'll come back to that later.


Next, the way to process 5 or 10 or 100 items at once is to create a pool of 5 or 10 or 100 workers, and put the items into a queue that the workers service. Fortunately, the stdlib multiprocessing and concurrent.futures libraries both wraps up most of the details for you.

The former is more powerful and flexible for traditional programming; the latter is simpler if you need to compose future-waiting; for trivial cases, it really doesn't matter which you choose. (In this case, the most obvious implementation with each takes 3 lines with futures, 4 lines with multiprocessing.)

If you're using 2.6-2.7 or 3.0-3.1, futures isn't built in, but you can install it from PyPI (pip install futures).


Finally, it's usually a lot simpler to parallelize things if you can turn the entire loop iteration into a function call (something you could, e.g., pass to map), so let's do that first:

def try_my_operation(item):
    try:
        api.my_operation(item)
    except:
        print('error with item')

Putting it all together:

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_my_operation, item) for item in items]
concurrent.futures.wait(futures)

If you have lots of relatively small jobs, the overhead of multiprocessing might swamp the gains. The way to solve that is to batch up the work into larger jobs. For example (using grouper from the itertools recipes, which you can copy and paste into your code, or get from the more-itertools project on PyPI):

def try_multiple_operations(items):
    for item in items:
        try:
            api.my_operation(item)
        except:
            print('error with item')

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_multiple_operations, group) 
           for group in grouper(5, items)]
concurrent.futures.wait(futures)

Finally, what if your code is IO bound? Then threads are just as good as processes, and with less overhead (and fewer limitations, but those limitations usually won't affect you in cases like this). Sometimes that "less overhead" is enough to mean you don't need batching with threads, but you do with processes, which is a nice win.

So, how do you use threads instead of processes? Just change ProcessPoolExecutor to ThreadPoolExecutor.

If you're not sure whether your code is CPU-bound or IO-bound, just try it both ways.


Can I do this for multiple functions in my python script? For example, if I had another for loop elsewhere in the code that I wanted to parallelize. Is it possible to do two multi threaded functions in the same script?

Yes. In fact, there are two different ways to do it.

First, you can share the same (thread or process) executor and use it from multiple places with no problem. The whole point of tasks and futures is that they're self-contained; you don't care where they run, just that you queue them up and eventually get the answer back.

Alternatively, you can have two executors in the same program with no problem. This has a performance cost—if you're using both executors at the same time, you'll end up trying to run (for example) 16 busy threads on 8 cores, which means there's going to be some context switching. But sometimes it's worth doing because, say, the two executors are rarely busy at the same time, and it makes your code a lot simpler. Or maybe one executor is running very large tasks that can take a while to complete, and the other is running very small tasks that need to complete as quickly as possible, because responsiveness is more important than throughput for part of your program.

If you don't know which is appropriate for your program, usually it's the first.


There's multiprocesing.pool, and the following sample illustrates how to use one of them:

from multiprocessing.pool import ThreadPool as Pool
# from multiprocessing import Pool

pool_size = 5  # your "parallelness"

# define worker function before a Pool is instantiated
def worker(item):
    try:
        api.my_operation(item)
    except:
        print('error with item')

pool = Pool(pool_size)

for item in items:
    pool.apply_async(worker, (item,))

pool.close()
pool.join()

Now if you indeed identify that your process is CPU bound as @abarnert mentioned, change ThreadPool to the process pool implementation (commented under ThreadPool import). You can find more details here: http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers


You can split the processing into a specified number of threads using an approach like this:

import threading                                                                

def process(items, start, end):                                                 
    for item in items[start:end]:                                               
        try:                                                                    
            api.my_operation(item)                                              
        except Exception:                                                       
            print('error with item')                                            


def split_processing(items, num_splits=4):                                      
    split_size = len(items) // num_splits                                       
    threads = []                                                                
    for i in range(num_splits):                                                 
        # determine the indices of the list this thread will handle             
        start = i * split_size                                                  
        # special case on the last chunk to account for uneven splits           
        end = None if i+1 == num_splits else (i+1) * split_size                 
        # create the thread                                                     
        threads.append(                                                         
            threading.Thread(target=process, args=(items, start, end)))         
        threads[-1].start() # start the thread we just created                  

    # wait for all threads to finish                                            
    for t in threads:                                                           
        t.join()                                                                



split_processing(items)

import numpy as np
import threading


def threaded_process(items_chunk):
    """ Your main process which runs in thread for each chunk"""
    for item in items_chunk:                                               
        try:                                                                    
            api.my_operation(item)                                              
        except Exception:                                                       
            print('error with item')  

n_threads = 20
# Splitting the items into chunks equal to number of threads
array_chunk = np.array_split(input_image_list, n_threads)
thread_list = []
for thr in range(n_threads):
    thread = threading.Thread(target=threaded_process, args=(array_chunk[thr]),)
    thread_list.append(thread)
    thread_list[thr].start()

for thread in thread_list:
    thread.join()