只是为您编写了一种可能的解决方案,使用多重处理 https://docs.python.org/3/library/multiprocessing.html#process-and-exceptions物体Process https://docs.python.org/3/library/multiprocessing.html#process-and-exceptions and Queue https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue.
我测量了它的吞吐速度,平均需要150 mcs
(微秒)来处理一项几乎不执行任何操作的任务。处理只是从任务中获取整数,加 1 并将其发送回。我认为 150 微秒的延迟应该完全足以让你处理 30 FPS。
使用队列代替管道,因为我认为它更适合多任务处理。而且,如果您的时间测量很精确,那么队列也是660x
比 Pipe 快几倍(150 微秒与 100 毫秒延迟相比)。
您可以注意到,处理循环批量发送任务,这意味着它首先向所有进程发送许多任务,然后才收集所有已发送和已处理的任务。与一次仅发送 1 个任务然后收集很少的结果相比,这种批处理使处理更加流畅。
如果您将任务发送到进程,然后在单独的轻量级线程中异步收集结果,那就更好了。这将防止您阻塞等待最慢的进程完成任务。
通过发送信号通知进程完成并退出None
任务交给他们。
在线尝试一下! https://replit.com/@moytrage/StackOverflow70784547#main.py
def process(idx, in_q, out_q):
while True:
task = in_q.get()
if task is None:
break
out_q.put({'n': task['n'] + 1})
def main():
import multiprocessing, time
queue_size = 1 << 16
procs = []
for i in range(multiprocessing.cpu_count()):
in_q, out_q = [multiprocessing.Queue(queue_size) for j in range(2)]
procs.append({
'in_q': in_q,
'out_q': out_q,
'proc': multiprocessing.Process(target = process,
kwargs = dict(idx = i, in_q = in_q, out_q = out_q)),
})
procs[-1]['proc'].start()
num_blocks = 1 << 2
block = 1 << 10
assert block <= queue_size
tb = time.time()
for k in range(num_blocks):
# Send tasks
for i in range(block):
for j, proc in enumerate(procs):
proc['in_q'].put({'n': k * block * len(procs) + i * len(procs) + j})
# Receive tasks results
for i in range(block):
for proc in procs:
proc['out_q'].get()
print('Processing speed:', round((time.time() - tb) /
(num_blocks * block * len(procs)) * 1_000_000, 1), 'mcs per task')
# Send finish signals to processes
for proc in procs:
proc['in_q'].put(None)
# Join processes (wait for exit)
for proc in procs:
proc['proc'].join()
if __name__ == '__main__':
main()
Output:
Processing speed: 150.7 mcs per task
还测量了一次仅向所有进程发送 1 个任务(而不是一次 1000 个任务)以及一次接收 1 个任务的时间。在这种情况下,延迟是460 mcs
(微秒)。因此,您可以认为,在使用队列的最坏情况下,队列的纯延迟为 460 mcs(460 mcs 包括发送和接收)。
我已经采用了您的示例片段并对其进行了一些修改以使用队列而不是管道,并且得到0.1 ms
delay.
请注意,我在循环中执行此操作 5 次,因为第一次或第二次尝试初始化了一些与队列相关的内容。
在线尝试一下! https://replit.com/@moytrage/StackOverflow70784547var2#main.py
import multiprocessing as mp
import time
def proc(inp_q, out_q):
for i in range(5):
e = inp_q.get()
ts = float(time.time_ns())
out_q.put(ts)
if __name__ == "__main__":
inp_q, out_q = [mp.Queue(1 << 10) for i in range(2)]
p1 = mp.Process(target=proc, args=(inp_q, out_q))
p1.start()
for i in range(5):
ts = float(time.time_ns())
inp_q.put("START")
ts_end = out_q.get()
print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")
p1.join()
Output:
Time taken in ms: 2.181632
Time taken in ms: 0.14336
Time taken in ms: 0.09856
Time taken in ms: 0.156928
Time taken in ms: 0.108032
此外,在循环中运行示例多次会使第二次和其他发送/接收迭代比第一次快得多。
由于延迟初始化资源,第一次非常慢。大多数算法是延迟初始化 https://en.wikipedia.org/wiki/Lazy_initialization,这意味着它们仅在第一次调用时分配所有需要的资源。这是为了防止根本不使用算法时不必要的分配。另一方面,这使得首次调用变得更慢,因此您必须执行几次首次空调用来预热惰性算法。
在线尝试一下! https://tio.run/##hVDNSsQwEL7nKYaekmWtLqJIoQffQLT3ENq0O7iZhmQURHz2mnajm5POIYSZ72e@8R98nOn2wYdlQefnwODeTow@zL2NEWkCE8F5kYeMzgox2BFWhOyPeBp0PxOpRkCqcQ6AgATB0GTlXW6vdcHWwfbvUv1OOEK7KdfroykWs4IVLQ2SoxICR9CaTMJqaFuotHYGSeuqERvRm2CJN9a@UEguztdP6G028IfcOqeVbMJkuV2j7SH9Y1sk3KsfUh0TkJPGf5H/ClbseE5WvXSPz11VnkWnflIoofl0F5mAxHKsumQAbF4trau42MCnzAJXSUldy8PNbnevvpLBsnwD
import multiprocessing as mp
import time
def proc(child_conn):
for i in range(5):
child_conn.recv()
ts = time.time_ns()
child_conn.send(ts)
if __name__ == "__main__":
parent_conn, child_conn = mp.Pipe()
p1 = mp.Process(target=proc, args=(child_conn,))
p1.start()
for i in range(5):
ts = time.time_ns()
parent_conn.send("START")
ts_end = parent_conn.recv()
print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")
Output:
Time taken in ms: 2.693857
Time taken in ms: 0.072593
Time taken in ms: 0.038733
Time taken in ms: 0.039086
Time taken in ms: 0.037021