Python Multiprocessing Pipe is very slow (>100ms)
Just wrote one possible solution for you, using multiprocessing objects Process and Queue.
I measured its throughtput speed and it takes on average 150 mcs
(micro-seconds) to process one task that does almost nothing. Processing just takes integer number from a task, adds 1 to it and sends it back. I think 150 micro-seconds delay should be totally enough for you to process 30 FPS.
Queue is used instead of your Pipe, as I think it is more suitable for multi-task processing. And also if your time measurements are precise then Queue is also 660x
times faster than Pipe (150 Micro seconds compared to 100 Milli seconds delay).
You can notice that processing loop sends tasks in batches, meaning that first it sends many tasks to all processes and only after that gathers all sent and processed tasks. This kind of batch processing makes processing smooth, compared to sending just 1 task at a time and then gathering few results.
Even better would be if you send tasks to processes and then gather results asynchrounously in separate lighweight threads. This will prevent you blocking on waiting slowest process to finish tasks.
Processes are signalled to finish and exit by sending None
task to them.
Try it online!
def process(idx, in_q, out_q):
while True:
task = in_q.get()
if task is None:
break
out_q.put({'n': task['n'] + 1})
def main():
import multiprocessing, time
queue_size = 1 << 16
procs = []
for i in range(multiprocessing.cpu_count()):
in_q, out_q = [multiprocessing.Queue(queue_size) for j in range(2)]
procs.append({
'in_q': in_q,
'out_q': out_q,
'proc': multiprocessing.Process(target = process,
kwargs = dict(idx = i, in_q = in_q, out_q = out_q)),
})
procs[-1]['proc'].start()
num_blocks = 1 << 2
block = 1 << 10
assert block <= queue_size
tb = time.time()
for k in range(num_blocks):
# Send tasks
for i in range(block):
for j, proc in enumerate(procs):
proc['in_q'].put({'n': k * block * len(procs) + i * len(procs) + j})
# Receive tasks results
for i in range(block):
for proc in procs:
proc['out_q'].get()
print('Processing speed:', round((time.time() - tb) /
(num_blocks * block * len(procs)) * 1_000_000, 1), 'mcs per task')
# Send finish signals to processes
for proc in procs:
proc['in_q'].put(None)
# Join processes (wait for exit)
for proc in procs:
proc['proc'].join()
if __name__ == '__main__':
main()
Output:
Processing speed: 150.7 mcs per task
Also measured timings for sending just 1 task at a time (instead of 1000 tasks at a time) to all processes and receiving 1 task at a time. In this case delay is 460 mcs
(micro-seconds). So you can think of this as if pure delay of Queue is 460 mcs in the worst case of using it (460 mcs include both send + recv).
I've taken your example snippet and modified it a bit to use Queue instead of Pipe, and got 0.1 ms
delay.
Notice that I do this in a loop 5 times because first or second try initializes some Queue related stuff.
Try it online!
import multiprocessing as mp
import time
def proc(inp_q, out_q):
for i in range(5):
e = inp_q.get()
ts = float(time.time_ns())
out_q.put(ts)
if __name__ == "__main__":
inp_q, out_q = [mp.Queue(1 << 10) for i in range(2)]
p1 = mp.Process(target=proc, args=(inp_q, out_q))
p1.start()
for i in range(5):
ts = float(time.time_ns())
inp_q.put("START")
ts_end = out_q.get()
print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")
p1.join()
Output:
Time taken in ms: 2.181632
Time taken in ms: 0.14336
Time taken in ms: 0.09856
Time taken in ms: 0.156928
Time taken in ms: 0.108032
Also running your example in loop several times makes second and other send/recv iterations much faster than first time.
First time is very slow due to Lazily initializing resources. Most algorithms are Lazily Initialized, meaning that they allocate all needed resources only on first call. This is needed to prevent unnecessary allocation when algorithm is not used at all. On the other side this makes first call much more slower, hence you have to do few first empty calls to pre-heat Lazy algorithm.
Try it online!
import multiprocessing as mp
import time
def proc(child_conn):
for i in range(5):
child_conn.recv()
ts = time.time_ns()
child_conn.send(ts)
if __name__ == "__main__":
parent_conn, child_conn = mp.Pipe()
p1 = mp.Process(target=proc, args=(child_conn,))
p1.start()
for i in range(5):
ts = time.time_ns()
parent_conn.send("START")
ts_end = parent_conn.recv()
print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")
Output:
Time taken in ms: 2.693857
Time taken in ms: 0.072593
Time taken in ms: 0.038733
Time taken in ms: 0.039086
Time taken in ms: 0.037021
The following program sends a simple object through a pipe 1 million times and measures the total elapsed time in seconds and average send time in milliseconds. I am running on a fairly old Windows desktop, an Intel(R) Core(TM) i7-4790 CPU @ 3.60 GHz:
from multiprocessing import Pipe, Process
import time
class Message:
def __init__(self, text):
self.text = text
N = 1_000_000
def worker(recv_connection):
for _ in range(N):
msg = recv_connection.recv()
def main():
recv_connection, send_connection = Pipe(duplex=False)
p = Process(target=worker, args=(recv_connection,))
p.start()
msg = Message('dummy')
start_time = time.time_ns()
for _ in range(N):
send_connection.send(msg)
p.join()
elapsed = time.time_ns() - start_time
print(f'Total elapsed time: {elapsed / 1_000_000_000} seconds')
print(f'Average send time: {elapsed / (1_000_000 * N)}ms.')
if __name__ == '__main__':
main()
Prints:
Total elapsed time: 10.7369966 seconds
Average send time: 0.0107369966ms.
This is 10,000 times faster than what you are achieving (100ms.), so I can only conclude that it must be the complexity of the object that you are sending through the pipe.
Update
You do want to use multiprocessing but I would suggest a multiprocessing pool, specifically a multiprocessing.pool.Pool
instance used in conjunction with the imap
method. This would allow you to have a generator function that yields the next frame to be processed and submitted to the pool for processing and get the processed frame returned back to the main process as it becomes available and returned in the order in which the frames were submitted. The following outlines the basic idea:
from multiprocessing import Pool, cpu_count
import time
def process_frame(frame):
# return processed frame
time.sleep(.1)
return frame.upper()
def generate_frames_for_processing():
for i in range(100):
time.sleep(.033)
yield f'msg{i}'
def main():
# Leave a processor for the main process:
pool = Pool(cpu_count() - 1)
start_time = time.time()
# get processed results as they are returned in order of being processed:
for processed_frame in pool.imap(process_frame, generate_frames_for_processing()):
# Do something with returned processed frame
# These will be in the same order as the frames are submitted
...
print(processed_frame)
pool.close()
pool.join()
print('Elapsed:', time.time() - start_time)
if __name__ == '__main__':
main()
Prints:
MSG0
MSG1
MSG2
...
MSG97
MSG98
MSG99
Elapsed: 3.467884302139282
You can specify a chunksize argument on the imap
call, but you probably do not want to. See the documentation for details.