Issue
您的代码中的主要问题是每个Worker
opens ips.txt
从头开始并适用于中找到的每个 URLips.txt
。于是五个工人一起打开ips.txt
五次,每个 URL 工作五次。
Solution
解决这个问题的正确方法是将代码拆分为master and worker。您已经实现了大部分工作代码。让我们来看看主要部分(在if __name__ == '__main__':
)现在作为主人。
现在master应该启动五个worker并通过队列向他们发送工作(multiprocessing.Queue
).
The multiprocessing.Queue
类提供了一种方法,让多个生产者可以将数据放入其中,多个消费者可以从中读取数据,而不会遇到竞争条件。此类实现了所有必要的锁定语义,以便在多处理上下文中安全地交换数据并防止竞争条件。
固定码
以下是如何按照我上面描述的方式重写您的代码:
import warnings
import requests
import multiprocessing
from colorama import init
init(autoreset=True)
from requests.packages.urllib3.exceptions import InsecureRequestWarning
warnings.simplefilter("ignore", UserWarning)
warnings.simplefilter('ignore', InsecureRequestWarning)
from bs4 import BeautifulSoup as BS
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
class Worker(multiprocessing.Process):
def __init__(self, job_queue):
super().__init__()
self._job_queue = job_queue
def run(self):
while True:
url = self._job_queue.get()
if url is None:
break
req = url.strip()
try:
page = requests.get(req, headers=headers, verify=False, allow_redirects=False, stream=True,
timeout=10)
soup = BS(page.text)
# string = string.encode('ascii', 'ignore')
print('\033[32m' + req + ' - Title: ', soup.title)
except requests.RequestException as e:
print('\033[32m' + req + ' - TimeOut!')
if __name__ == '__main__':
jobs = []
job_queue = multiprocessing.Queue()
for i in range(5):
p = Worker(job_queue)
jobs.append(p)
p.start()
# This is the master code that feeds URLs into queue.
with open('ips.txt', 'r') as urls:
for url in urls.readlines():
job_queue.put(url)
# Send None for each worker to check and quit.
for j in jobs:
job_queue.put(None)
for j in jobs:
j.join()
我们在上面的代码中可以看到master打开ips.txt
一旦,从其中一一读取 URL 并将它们放入队列中。每个工作人员都会等待 URL 到达此队列。一旦 URL 到达队列,其中一个工作人员就会拿起它并开始忙碌。如果队列中有更多 URL,则下一个空闲工作人员会选择下一个,依此类推。
最后,我们需要某种方式让工人在所有工作完成后退出。有多种方法可以实现这一目标。在此示例中,我选择了发送五个标记值(五个None
值(在这种情况下)放入队列中,每个工作人员一个,以便每个工作人员都可以拿起它并退出。
还有另一种策略,工人和主人共享一个multiprocessing.Event
对象就像他们共享一个multiprocessing.Queue
现在就反对。主机调用set()
每当它希望工作人员退出时,都会调用该对象的方法。工作人员检查该对象是否is_set()
然后退出。但是,这会给代码带来一些额外的复杂性。我在下面讨论过这个问题。
为了完整起见,也为了演示最小、完整和可验证的示例,我在下面提供了两个代码示例,它们显示了两种停止策略。
使用哨兵值阻止工人
到目前为止,这与我上面描述的内容差不多,只是代码示例已被大大简化,以消除对 Python 标准库之外的任何库的依赖。
在下面的示例中值得注意的另一件事是,我们没有创建工作类,而是使用工作函数并创建一个Process
出来了。这种类型的代码经常在 Python 文档中找到,而且非常惯用。
import multiprocessing
import time
import random
def worker(input_queue):
while True:
url = input_queue.get()
if url is None:
break
print('Started working on:', url)
# Random delay to simulate fake processing.
time.sleep(random.randint(1, 3))
print('Stopped working on:', url)
def master():
urls = [
'https://example.com/',
'https://example.org/',
'https://example.net/',
'https://stackoverflow.com/',
'https://www.python.org/',
'https://github.com/',
'https://susam.in/',
]
input_queue = multiprocessing.Queue()
workers = []
# Create workers.
for i in range(5):
p = multiprocessing.Process(target=worker, args=(input_queue, ))
workers.append(p)
p.start()
# Distribute work.
for url in urls:
input_queue.put(url)
# Ask the workers to quit.
for w in workers:
input_queue.put(None)
# Wait for workers to quit.
for w in workers:
w.join()
print('Done')
if __name__ == '__main__':
master()
使用事件来停止 Worker
使用multiprocessing.Event
对象在工人应该退出时发出信号会在代码中引入一些复杂性。主要需要进行三项更改:
- 在master中,我们调用
set()
方法上的Event
反对发出工人应尽快辞职的信号。
- 在worker中,我们调用
is_set()
的方法Event
定期反对以检查是否应该退出。
- 在master中,我们需要使用
multiprocessing.JoinableQueue
代替multiprocessing.Queue
这样它就可以在要求工作人员退出之前测试队列是否已被工作人员完全消耗掉。
- 在worker中,我们需要调用
task_done()
消耗队列中的每个项目后队列的方法。这对于主机调用是必要的join()
方法来测试队列是否已被清空。
所有这些更改都可以在下面的代码中找到:
import multiprocessing
import time
import random
import queue
def worker(input_queue, stop_event):
while not stop_event.is_set():
try:
# Check if any URL has arrived in the input queue. If not,
# loop back and try again.
url = input_queue.get(True, 1)
input_queue.task_done()
except queue.Empty:
continue
print('Started working on:', url)
# Random delay to simulate fake processing.
time.sleep(random.randint(1, 3))
print('Stopped working on:', url)
def master():
urls = [
'https://example.com/',
'https://example.org/',
'https://example.net/',
'https://stackoverflow.com/',
'https://www.python.org/',
'https://github.com/',
'https://susam.in/',
]
input_queue = multiprocessing.JoinableQueue()
stop_event = multiprocessing.Event()
workers = []
# Create workers.
for i in range(5):
p = multiprocessing.Process(target=worker,
args=(input_queue, stop_event))
workers.append(p)
p.start()
# Distribute work.
for url in urls:
input_queue.put(url)
# Wait for the queue to be consumed.
input_queue.join()
# Ask the workers to quit.
stop_event.set()
# Wait for workers to quit.
for w in workers:
w.join()
print('Done')
if __name__ == '__main__':
master()