如何将多处理与请求模块一起使用?

2023-11-22

我是 python 的新开发者。我的代码是下面的代码:

import warnings
import requests
import multiprocessing

from colorama import init
init(autoreset=True)

from requests.packages.urllib3.exceptions import InsecureRequestWarning
warnings.simplefilter("ignore", UserWarning)
warnings.simplefilter('ignore', InsecureRequestWarning)

from bs4 import BeautifulSoup as BS

headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}


class Worker(multiprocessing.Process):

    def run(self):
        with open('ips.txt', 'r') as urls:
            for url in urls.readlines():
                req = url.strip()
                try:
                    page = requests.get(req, headers=headers, verify=False, allow_redirects=False, stream=True,
                                        timeout=10)
                    soup = BS(page.text)
                    # string = string.encode('ascii', 'ignore')
                    print('\033[32m' + req + ' - Title: ', soup.title)
                except requests.RequestException as e:
                    print('\033[32m' + req + ' - TimeOut!')
        return


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = Worker()
        jobs.append(p)
        p.start()
    for j in jobs:
        j.join()

我正在尝试让程序读取IPs.txt并打印出每个网站的标题。

它在单线程中完美运行。现在我想通过使用使它更快multiprocessing.

但由于某种原因它只输出同一行 5 次。我是多处理新手,并尽力尝试失败。

显示问题的屏幕截图:

screen shot showing problem

我只想运行 5 个工人来检查IPs.txt在多线程或并行中......我只是想让它更快。

有什么提示、线索、帮助吗?


Issue

您的代码中的主要问题是每个Worker opens ips.txt从头开始并适用于中找到的每个 URLips.txt。于是五个工人一起打开ips.txt五次,每个 URL 工作五次。

Solution

解决这个问题的正确方法是将代码拆分为master and worker。您已经实现了大部分工作代码。让我们来看看主要部分(在if __name__ == '__main__':)现在作为主人。

现在master应该启动五个worker并通过队列向他们发送工作(multiprocessing.Queue).

The multiprocessing.Queue类提供了一种方法,让多个生产者可以将数据放入其中,多个消费者可以从中读取数据,而不会遇到竞争条件。此类实现了所有必要的锁定语义,以便在多处理上下文中安全地交换数据并防止竞争条件。

固定码

以下是如何按照我上面描述的方式重写您的代码:

import warnings
import requests
import multiprocessing

from colorama import init
init(autoreset=True)

from requests.packages.urllib3.exceptions import InsecureRequestWarning
warnings.simplefilter("ignore", UserWarning)
warnings.simplefilter('ignore', InsecureRequestWarning)

from bs4 import BeautifulSoup as BS

headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}


class Worker(multiprocessing.Process):

    def __init__(self, job_queue):
        super().__init__()
        self._job_queue = job_queue

    def run(self):
        while True:
            url = self._job_queue.get()
            if url is None:
                break

            req = url.strip()

            try:
                page = requests.get(req, headers=headers, verify=False, allow_redirects=False, stream=True,
                                    timeout=10)
                soup = BS(page.text)
                # string = string.encode('ascii', 'ignore')
                print('\033[32m' + req + ' - Title: ', soup.title)
            except requests.RequestException as e:
                print('\033[32m' + req + ' - TimeOut!')


if __name__ == '__main__':
    jobs = []
    job_queue = multiprocessing.Queue()

    for i in range(5):
        p = Worker(job_queue)
        jobs.append(p)
        p.start()

    # This is the master code that feeds URLs into queue.
    with open('ips.txt', 'r') as urls:
        for url in urls.readlines():
            job_queue.put(url)

    # Send None for each worker to check and quit.
    for j in jobs:
        job_queue.put(None)

    for j in jobs:
        j.join()

我们在上面的代码中可以看到master打开ips.txt一旦,从其中一一读取 URL 并将它们放入队列中。每个工作人员都会等待 URL 到达此队列。一旦 URL 到达队列,其中一个工作人员就会拿起它并开始忙碌。如果队列中有更多 URL,则下一个空闲工作人员会选择下一个,依此类推。

最后,我们需要某种方式让工人在所有工作完成后退出。有多种方法可以实现这一目标。在此示例中,我选择了发送五个标记值(五个None值(在这种情况下)放入队列中,每个工作人员一个,以便每个工作人员都可以拿起它并退出。

还有另一种策略,工人和主人共享一个multiprocessing.Event对象就像他们共享一个multiprocessing.Queue现在就反对。主机调用set()每当它希望工作人员退出时,都会调用该对象的方法。工作人员检查该对象是否is_set()然后退出。但是,这会给代码带来一些额外的复杂性。我在下面讨论过这个问题。

为了完整起见,也为了演示最小、完整和可验证的示例,我在下面提供了两个代码示例,它们显示了两种停止策略。

使用哨兵值阻止工人

到目前为止,这与我上面描述的内容差不多,只是代码示例已被大大简化,以消除对 Python 标准库之外的任何库的依赖。

在下面的示例中值得注意的另一件事是,我们没有创建工作类,而是使用工作函数并创建一个Process出来了。这种类型的代码经常在 Python 文档中找到,而且非常惯用。

import multiprocessing
import time
import random


def worker(input_queue):
    while True:
        url = input_queue.get()

        if url is None:
            break

        print('Started working on:', url)

        # Random delay to simulate fake processing.
        time.sleep(random.randint(1, 3))

        print('Stopped working on:', url)


def master():
    urls = [
        'https://example.com/',
        'https://example.org/',
        'https://example.net/',
        'https://stackoverflow.com/',
        'https://www.python.org/',
        'https://github.com/',
        'https://susam.in/',
    ]

    input_queue = multiprocessing.Queue()
    workers = []

    # Create workers.
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(input_queue, ))
        workers.append(p)
        p.start()

    # Distribute work.
    for url in urls:
        input_queue.put(url)

    # Ask the workers to quit.
    for w in workers:
        input_queue.put(None)

    # Wait for workers to quit.
    for w in workers:
        w.join()

    print('Done')


if __name__ == '__main__':
    master()

使用事件来停止 Worker

使用multiprocessing.Event对象在工人应该退出时发出信号会在代码中引入一些复杂性。主要需要进行三项更改:

  • 在master中,我们调用set()方法上的Event反对发出工人应尽快辞职的信号。
  • 在worker中,我们调用is_set()的方法Event定期反对以检查是否应该退出。
  • 在master中,我们需要使用multiprocessing.JoinableQueue代替multiprocessing.Queue这样它就可以在要求工作人员退出之前测试队列是否已被工作人员完全消耗掉。
  • 在worker中,我们需要调用task_done()消耗队列中的每个项目后队列的方法。这对于主机调用是必要的join()方法来测试队列是否已被清空。

所有这些更改都可以在下面的代码中找到:

import multiprocessing
import time
import random
import queue


def worker(input_queue, stop_event):
    while not stop_event.is_set():
        try:
            # Check if any URL has arrived in the input queue. If not,
            # loop back and try again.
            url = input_queue.get(True, 1)
            input_queue.task_done()
        except queue.Empty:
            continue

        print('Started working on:', url)

        # Random delay to simulate fake processing.
        time.sleep(random.randint(1, 3))

        print('Stopped working on:', url)


def master():
    urls = [
        'https://example.com/',
        'https://example.org/',
        'https://example.net/',
        'https://stackoverflow.com/',
        'https://www.python.org/',
        'https://github.com/',
        'https://susam.in/',
    ]

    input_queue = multiprocessing.JoinableQueue()
    stop_event = multiprocessing.Event()

    workers = []

    # Create workers.
    for i in range(5):
        p = multiprocessing.Process(target=worker,
                                    args=(input_queue, stop_event))
        workers.append(p)
        p.start()

    # Distribute work.
    for url in urls:
        input_queue.put(url)

    # Wait for the queue to be consumed.
    input_queue.join()

    # Ask the workers to quit.
    stop_event.set()

    # Wait for workers to quit.
    for w in workers:
        w.join()

    print('Done')


if __name__ == '__main__':
    master()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何将多处理与请求模块一起使用? 的相关文章

  • Python 函数句柄 ala Matlab

    在 MATLAB 中可以创建function handles http www mathworks co uk help techdoc ref function handle html与类似的东西 myfun arglist body 这
  • UnicodeDecodeError:“utf-8”编解码器无法解码位置 14 中的字节 0xb9:起始字节无效

    我正在使用 Django REST 进行文件上传测试 Python3 6 2Django1 11djangorest框架 3 6 4Excel OSX 15 38 170902 操作系统 10 12 6 过去使用普通照片文件可以成功完成此操
  • Visual Studio Code:如何使用参数调试 Python 脚本

    我正在使用 Visual Studio Code 来调试 Python 脚本 下列的本指南 https code visualstudio com docs python debugging 我在中设置了参数launch json file
  • 从 Python 将分层 JSON 数据写入 Excel xls?

    我想将一些数据从 python 写入 xlsx 我目前将其存储为 JSON 但它从 Python 中输出什么并不重要 单个文章的 JSON 如下所示 Word Count 50 Key Words Blah blah blah Foo Fr
  • 如何使用 python http.server 运行 CGI“hello world”

    我使用的是 Windows 7 和 Python 3 4 3 我想在浏览器中运行这个简单的 helloworld py 文件 print Content Type text html print print print print h2 H
  • 更改 numpy 数组的结构强制给定值

    如何缩小栅格数据的比例4 X 6大小成2 X 3如果 2 2 像素内的任何元素包含 1 则大小强制选择 1 否则选择 0 import numpy as np data np array 0 0 1 1 0 0 1 0 0 1 0 0 1
  • Django 说“id 可能不为 NULL”,但为什么会这样呢?

    我今天要疯了 我只是尝试插入一条新记录 但它返回了 post blogpost id 可能不为 NULL 错误 这是我的模型 class BlogPost models Model title models CharField max le
  • 如何解决CDK CLI版本不匹配的问题

    我收到以下错误 此 CDK CLI 与您的应用程序使用的 CDK 库不兼容 请将CLI升级到最新版本 云程序集架构版本不匹配 支持的最大架构版本为 8 0 0 但发现为 9 0 0 发出后cdk diff命令 我确实跑了npm instal
  • Scapy:如何将新层(802.1q)插入现有数据包?

    我有一个数据包转储 想要将 VLAN 标记 802 1q 标头 注入到数据包中 怎么做 为了找到答案 我查看了Scapy 插入新层和记录问题 https stackoverflow com q 17259592 1381638 这确实很有帮
  • 如何不断地将 STDOUT 发送到我的 python TCP 服务器?

    我有简单的 python echo 服务器 它使用套接字 并向客户端回显随机数 我有另一个程序 每 2 秒将值打印到标准输出 如果它只是一个脚本 我可以像这样重定向 stdout python script py 并像这样在脚本中获取它da
  • Python 中使用 globals() 的原因?

    Python 中有 globals 函数的原因是什么 它只返回全局变量的字典 这些变量已经是全局的 所以它们可以在任何地方使用 我只是出于好奇而问 试图学习Python def F global x x 1 def G print glob
  • telethon 库:如何通过电话号码添加用户

    我正在研究 Telegram 的 Telethon 库 它可以使用 Telegram API 充当 Telegram 客户端 重要提示 这是电报客户端 API https core telegram org telegram api 而不是
  • TypeError:“NoneType”对象不可下标[重复]

    这个问题在这里已经有答案了 错误 names curfetchone 0 TypeError NoneType object is not subscriptable 我尝试检查缩进 但仍然有错误 我读到 如果数据库中没有文件名记录 变量名
  • Python Tkinter 网格复选框

    我想知道是否有一种简单的方法可以使用 Tkinter 创建复选框网格 我正在尝试制作一个由 10 行和 10 列 即 100 个复选框 组成的网格 以便每行只能选择两个复选框 编辑 我正在使用带有spyder的python 2 7 到目前为
  • Django:显示管理员验证错误的自定义错误消息

    我正在使用 Django 1 2 4 我有一个模型 其中有一个需要验证的字段 当验证失败时 我想向用户显示自定义错误消息 模型编辑是在管理界面中完成的 这就是我目前正在做的事情 def clean fields self exclude N
  • 列表中的“u”是什么意思?

    这是我第一次遇到这种情况 刚刚打印了一个列表 每个元素似乎都有一个u在它前面 即 u hello u hi u hey 它是什么意思 为什么列表的每个元素前面都会有这个 由于我不知道这种情况有多常见 如果您想了解我是如何遇到它的 我会很乐意
  • 给定一个字符串,如何删除所有重复的连续字母?

    如何从字符串中删除两个连续的字母 例如 a str hii thherre 应该成为 hi there 我尝试这样做 a str join sorted set a str key a str index 但是 我得到 hi ter 是的
  • Matplotlib:检查空图

    我有一个循环加载并绘制一些数据 如下所示 import os import numpy as np import matplotlib pyplot as plt for filename in filenames plt figure i
  • 具有行业级约束的 SciPy 投资组合优化

    尝试在这里优化投资组合权重分配 通过限制风险来最大化我的回报函数 我可以毫无问题地通过简单的约束 所有权重之和等于 1 找到产生我的回报函数的优化权重 并做出另一个约束 即我的总风险低于目标风险 我的问题是 如何为每个组添加行业权重界限 我
  • 使用 python/scipy 进行 voronoi 和 lloyd 松弛

    如何使用 Qhull 确定哪些 voronoi 单元 按索引 是 正确的 由 现有顶点 组成 我正在尝试使用 LLoyds 算法和 scipy spatial Voronoi 它是 Qhull 的包装器 生成的输入来执行约束松弛 就代码而言

随机推荐

  • CXF 客户端代理线程安全吗?

    我正在使用 CXF 生成 SOAP 客户端类 在里面CXF 文档 他们写 JAX WS 客户端代理线程安全吗 JAX WS 官方答案 不 根据 JAX WS 规范 客户端代理不是线程安全的 要编写可移植代码 您应该将它们视为非线程安全并同步
  • 嵌套对象的远程 ViewModel 验证不起作用

    我有一个类用户 如下所示 public class User public int UserId get set Required ErrorMessage A username is required StringLength 20 Er
  • python中按多个条件排序

    我是编程新手 现在我正在用 python 编写排行榜 我想按第一积分对我的联赛进行排序 如果有两支球队积分相同 我想按净胜球对它们进行排序 如果它们有相同的净胜球 我想按名称排序 第一个条件非常简单 并且按以下方式工作 table sort
  • 使用 jQuery 查找下载链接后面的文件大小

    我想知道是否有一种方法可以使用 jQuery 来找出我链接到网页的 PDF 的文件大小 我想让鼠标悬停在下载链接上时 会弹出一个模式框 显示 PDF 的文件大小 我可以做第二位 我唯一想知道的是如何找出文件大小 我不知道 jQuery 是否
  • AWS beanstalk中的worker-tier和web-tier有什么区别

    在阅读文档时 我开始了解 AWS 中的这两层环境 但找不到它们之间的任何比较 文档中的建议是 应该为长时间运行的任务选择工作环境 以提高 Web 层的响应能力 我有几个问题来澄清我的疑惑 两层有何不同 关于执行不同的操作 每个操作中可用的服
  • 如何在 Spring 中自动装配泛型类型 的 Bean?

    我有一颗豆子Item
  • 我可以使用Spring5的WebClient返回的Flux的block()方法吗?

    我创建了 Spring Boot 2 0 演示应用程序 其中包含两个使用 WebClient 进行通信的应用程序 当我从 WebClient 的响应中使用 Flux 的 block 方法时 他们经常停止通信 这让我很痛苦 由于某些原因 我想
  • C++ 引用与返回值

    我理解引用的原则是避免复制大型结构 但是如果您正在编写的函数本身创建了大型结构怎么办 与将目标对象作为引用传递并从函数内部填充相比 在本地创建变量然后返回它是否效率较低 或者更有可能耗尽内存 我似乎无法很好地表达 所以一个具体的例子 假设一
  • JS 对象键带引号还是不带引号? [复制]

    这个问题在这里已经有答案了 可能的重复 带引号和不带引号的对象键有什么区别 感兴趣吗 什么是正确的方法 是否将对象键写在引号中 那是 var obj name Jhon or var obj name Jhon 例如 从 php 代码ech
  • Git 中的 HEAD 和 ORIG_HEAD

    这些符号代表什么以及它们的含义是什么 我在官方文档中找不到任何解释 HEAD是 直接或间接 即符号 对当前提交的引用 这是您已在工作目录中签入的提交 除非您进行了一些更改或同等更改 并且是在 git commit 之上创建新提交的提交 通常
  • 无法在具有 @objc 属性的协议中使用自定义类?

    我正在尝试创建一个用于 JSON 加载委托的协议 JSONLoaderDelegate 我的另一堂课叫做JSONLoader 应该将事件分派给它的委托 实现JSONLoaderDelegate协议 如 self delegate jsonL
  • Laravel 8 - 找不到驱动程序:Illuminate\Database\QueryException 无法找到驱动程序(SQL:从 `list` 中选择 *)

    我已经在我的 Linux Mint 20 上安装了 Laravel 8 作为我的个人实验 所以我对 Laravel 的新版本很陌生 我搜索了许多来源如何使用 CRUD 方法显示表 以便该表显示在网络中 其中包含来自 MySQL 数据库的数据
  • Spring data JPA:在结果元组中找不到别名!执行自定义查询时出错

    我正在尝试使用 mysql 数据库执行自定义查询 Queryspring data jpa 的注解 该表是 Field Type Null Key Default Extra id decimal 10 0 NO PRI NULL firs
  • iOS中如何实现弹出对话框?

    计算后 我想显示一个弹出窗口或警报框 向用户传达消息 有谁知道我在哪里可以找到有关此的更多信息 Yup a UIAlertView可能就是您正在寻找的 这是一个例子 UIAlertView alert UIAlertView alloc i
  • std::bind 创建的函子住在哪里?

    函数指针可以指向自由函数 函数对象 成员函数调用的包装器等任何内容 但是 std bind 创建的函子可以有状态 也可以有自定义创建的函子 该状态分配在哪里 谁在删除它 考虑下面的例子 当向量被删除时 状态 数字10 会被删除吗 谁知道在函
  • 计算GPS坐标以形成给定大小的半径

    我想出了一种方法 它接受坐标和范围 以英里为单位 并返回围绕原点形成圆圈的坐标列表 我似乎已经取得了一些进展 但我在降低范围部分方面遇到了问题 private const Double LAT MILE 0 0144839 private
  • 仅使用 XAML 在左键单击时显示上下文菜单

    WPF 的默认行为ContextMenu是当用户右键单击时显示它 我想要ContextMenu当用户单击鼠标左键时显示 看起来这应该是一个简单的属性ContextMenu 但事实并非如此 我操纵了它 这样我就可以处理LeftMouseBut
  • 为什么匿名函数表达式和命名函数表达式的初始化如此不同?

    我正在看第13条或 ECMAScript 规范 v 5 匿名函数表达式的初始化如下 返回按照 13 2 中的规定创建新 Function 对象的结果 其参数由 FormalParameterListopt 指定 主体由 FunctionBo
  • 作曲家自动加载

    我目前正在尝试将 PSR 0 自动加载与 Composer 结合使用 但出现以下错误 Fatal error Class Twitter Twitter not found 我的目录结构如下 Project src Twitter Twit
  • 如何将多处理与请求模块一起使用?

    我是 python 的新开发者 我的代码是下面的代码 import warnings import requests import multiprocessing from colorama import init init autores