您没有看到预期内容的主要原因是以下代码行:
r = p.map(connect_db, ())
你正在呼唤multiprocess.map
有一个空的可迭代,所以connect_db
根本没有被调用,并且你没有到达except
部分代码,不关闭池等。
这是一个可以工作的骨架,有一堆print
用于调试的语句。我附上了下面的输出,正如您所看到的,每一轮都有四个子进程。
import multiprocessing
import time
import random
def connect_db(i):
print(f"Trying to connect {i}")
time.sleep(random.random() * 2)
raise Exception("Failed to connect")
while True:
p = multiprocessing.Pool(4)
print("active children are:")
for idx, process in enumerate(multiprocessing.active_children()):
print(f"Child number {idx} is {process.name}") #why is the name incremented by 1 each time while loop iterates?
try:
print("About to create a pool")
r = p.map(connect_db, range(4))
print("Created a pool")
except Exception as e:
print(e)
print("terminating threads")
p.terminate()
p.close()
p.join()
time.sleep(5)
Output:
active children are:
Child number 0 is ForkPoolWorker-2
Child number 1 is ForkPoolWorker-1
Child number 2 is ForkPoolWorker-4
Child number 3 is ForkPoolWorker-3
About to create a pool
Trying to connect 0
Trying to connect 1
Trying to connect 2
Trying to connect 3
Failed to connect
terminating threads
active children are:
Child number 0 is ForkPoolWorker-5
Child number 1 is ForkPoolWorker-6
Child number 2 is ForkPoolWorker-8
Child number 3 is ForkPoolWorker-7
About to create a pool
Trying to connect 0
Trying to connect 1
...
最后一点 - 如果用例确实是数据库连接,则有现成的连接池,您可能应该使用其中之一。另外,我不确定是否可以跨进程共享数据库连接。
控制池中的进程名称
如果出于某种原因,您想控制池中的进程名称,您可以通过创建自己的池上下文来实现:
import multiprocessing
from multiprocessing import context
import time
import random
process_counter = 0
class MyForkProcess(multiprocessing.context.ForkProcess):
def __init__(self, *args, **kwargs):
global process_counter
name = f"MyForkProcess-{process_counter}"
process_counter += 1
super(MyForkProcess, self).__init__(*args, name = name, **kwargs)
class MyContext(multiprocessing.context.ForkContext):
_name = 'MyForkContext'
Process = MyForkProcess
def connect_db(i):
print(f"Trying to connect {i}")
cp = multiprocessing.current_process()
print(f"The name of the child process is {cp.name}")
time.sleep(random.random() * 2)
raise Exception("Failed to connect")
context = MyContext()
while True:
p = context.Pool(4)
print("active children are:")
for idx, process in enumerate(multiprocessing.active_children()):
print(f"Child number {idx} is {process.name}") #why is the name incremented by 1 each time while loop iterates?
try:
print("About to create a pool")
r = p.map(connect_db, range(4))
print("Created a pool")
except Exception as e:
print(e)
print("terminating threads")
p.terminate()
process_counter = 0
p.close()
p.join()
time.sleep(5)
现在的输出是:
active children are:
Child number 0 is MyForkPoolWorker-2
Child number 1 is MyForkPoolWorker-0
Child number 2 is MyForkPoolWorker-3
Child number 3 is MyForkPoolWorker-1
About to create a pool
Trying to connect 0
The name of the child process is MyForkPoolWorker-0
Trying to connect 1
The name of the child process is MyForkPoolWorker-1
Trying to connect 2
The name of the child process is MyForkPoolWorker-2
Trying to connect 3
The name of the child process is MyForkPoolWorker-3
Failed to connect
terminating threads
active children are:
Child number 0 is MyForkPoolWorker-2
Child number 1 is MyForkPoolWorker-0
Child number 2 is MyForkPoolWorker-1
Child number 3 is MyForkPoolWorker-3
About to create a pool
...