aiohttp.TCPConnector (带有 limit 参数)与 asyncio.Semaphore 用于限制并发连接数

2024-01-05

我想我想通过制作一个简单的脚本来学习新的 python async wait 语法,更具体地说是 asyncio 模块,该脚本允许您一次下载多个资源。

但现在我被困住了。

在研究时,我发现了两种限制并发请求数量的选项:

  1. 将 aiohttp.TCPConnector (带有 limit 参数)传递给 aiohttp.ClientSession 或
  2. 使用 asyncio.Semaphore。

如果您只想限制并发连接数,是否有首选选项或者可以互换使用它们? 就性能而言(大致)相同吗?

而且两者似乎都有 100 个并发连接/操作的默认值。如果我仅使用限制为 500 的信号量,aiohttp 内部是否会隐式将我锁定为 100 个并发连接?

这对我来说都是非常新的和不清楚的。请随时指出我的任何误解或代码中的缺陷。

这是我当前包含两个选项的代码(我应该删除哪个?):

奖金问题:

  1. 如何处理(最好重试 x 次)抛出错误的 coros?
  2. coro 完成后立即保存返回数据(通知我的 DataHandler)的最佳方法是什么?我不希望最后保存所有内容,因为我可以尽快开始处理结果。

s

import asyncio
from tqdm import tqdm
import uvloop as uvloop
from aiohttp import ClientSession, TCPConnector, BasicAuth

# You can ignore this class
class DummyDataHandler(DataHandler):
    """Takes data and stores it somewhere"""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def take(self, origin_url, data):
        return True

    def done(self):
        return None

class AsyncDownloader(object):
    def __init__(self, concurrent_connections=100, silent=False, data_handler=None, loop_policy=None):

        self.concurrent_connections = concurrent_connections
        self.silent = silent

        self.data_handler = data_handler or DummyDataHandler()

        self.sending_bar = None
        self.receiving_bar = None

        asyncio.set_event_loop_policy(loop_policy or uvloop.EventLoopPolicy())
        self.loop = asyncio.get_event_loop()
        self.semaphore = asyncio.Semaphore(concurrent_connections)

    async def fetch(self, session, url):
        # This is option 1: The semaphore, limiting the number of concurrent coros,
        # thereby limiting the number of concurrent requests.
        with (await self.semaphore):
            async with session.get(url) as response:
                # Bonus Question 1: What is the best way to retry a request that failed?
                resp_task = asyncio.ensure_future(response.read())
                self.sending_bar.update(1)
                resp = await resp_task

                await  response.release()
                if not self.silent:
                    self.receiving_bar.update(1)
                return resp

    async def batch_download(self, urls, auth=None):
        # This is option 2: Limiting the number of open connections directly via the TCPConnector
        conn = TCPConnector(limit=self.concurrent_connections, keepalive_timeout=60)
        async with ClientSession(connector=conn, auth=auth) as session:
            await asyncio.gather(*[asyncio.ensure_future(self.download_and_save(session, url)) for url in urls])

    async def download_and_save(self, session, url):
        content_task = asyncio.ensure_future(self.fetch(session, url))
        content = await content_task
        # Bonus Question 2: This is blocking, I know. Should this be wrapped in another coro
        # or should I use something like asyncio.as_completed in the download function?
        self.data_handler.take(origin_url=url, data=content)

    def download(self, urls, auth=None):
        if isinstance(auth, tuple):
            auth = BasicAuth(*auth)
        print('Running on concurrency level {}'.format(self.concurrent_connections))
        self.sending_bar = tqdm(urls, total=len(urls), desc='Sent    ', unit='requests')
        self.sending_bar.update(0)

        self.receiving_bar = tqdm(urls, total=len(urls), desc='Reveived', unit='requests')
        self.receiving_bar.update(0)

        tasks = self.batch_download(urls, auth)
        self.loop.run_until_complete(tasks)
        return self.data_handler.done()


### call like so ###

URL_PATTERN = 'https://www.example.com/{}.html'

def gen_url(lower=0, upper=None):
    for i in range(lower, upper):
        yield URL_PATTERN.format(i)   

ad = AsyncDownloader(concurrent_connections=30)
data = ad.download([g for g in gen_url(upper=1000)])

有首选的选择吗?

是的,见下图:

aiohttp 内部会隐式将我的并发连接数限制为 100 个吗?

是的,默认值 100 会锁定您,除非您指定其他限制。 您可以在此处的源代码中看到它:https://github.com/aio-libs/aiohttp/blob/master/aiohttp/connector.py#L1084 https://github.com/aio-libs/aiohttp/blob/master/aiohttp/connector.py#L1084

它们在性能方面(大致)相等吗?

否(但性能差异应该可以忽略不计),因为aiohttp.TCPConnector无论如何,检查可用连接,无论它是否被信号量包围,在这里使用信号量只是不必要的开销。

如何处理(最好重试 x 次)抛出错误的 coros?

我不相信有一种标准方法可以做到这一点,但一种解决方案是将您的调用包装在如下方法中:

async def retry_requests(...):
    for i in range(5):
        try:
            return (await session.get(...)
        except aiohttp.ClientResponseError:
            pass
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

aiohttp.TCPConnector (带有 limit 参数)与 asyncio.Semaphore 用于限制并发连接数 的相关文章

随机推荐

  • 检查 url 是否包含 http:// 或 https:// [重复]

    这个问题在这里已经有答案了 可能的重复 检查 url 是否包含 http 或 https https stackoverflow com questions 7334491 check if the url is contains the
  • 如何在加载时打开 React Native Maps 标记的标注

    我希望在安装屏幕组件时打开所有标记的所有标注 目前 它仅在单击标记时打开 如何在功能组件中使用 useRef 来执行此操作 const markerRef useRef React createRef return
  • 使用 C++17 Constexpr 查找数组

    我正在尝试编写一个 constexpr find 函数 它将返回包含特定值的 std array 的索引 下面的函数似乎工作正常 除非包含的类型是const char include
  • 哪个班级设计比较好? [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 哪个类设计更好 为什么 public class User public String UserName public String
  • 在 OS X 上以管理员权限运行脚本

    我已经尽力在 Stack Overflow 和互联网上找到许多脚本问题的解决方案 但我似乎找不到我需要的解决方案 我想要做的是创建一个更加自动化且点击次数更少的解决方案来删除系统上的所有移动缓存用户帐户 我一直在登录并手动转到用户帐户 然后
  • 如何设置 clojureScript 项目以使用规范并在运行时测试 clojure.core 函数?

    Clojure 1 9 推出specs https clojure org guides spec clojure core 库中的函数现在有规范 如何设置 clojurescript 项目以使用规范并在运行时测试 clojure core
  • 我可以采取什么措施来加快 S3 上传/更新速度?

    今天我一整天都在尝试向 s3 上传一些小东西 500 个目录中约有 20k 个文件 总计约 3GB 对于名为 简单存储服务 的服务来说 这是绝对合理的 我可以平均以大约 500k s 1mb s 1 8 到 3 6 GB h 之间 的速度上
  • Java 中最好的企业购物车是什么? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 未针对早午餐编译供应商 CSS 文件

    我对 b 有疑问 电子邮件受保护 cdn cgi l email protection不编译 Bower Component CSS 文件 如同在 Brunch 中分离应用程序和供应商 CSS https stackoverflow com
  • 在 C 中创建数组时出现分段错误

    我最近迁移到一台新笔记本电脑 HP dv6119tx 英特尔酷睿 i5 4 GB RAM 它安装了 Windows 7 Home Premium 64 位 我正在尝试创建一个类型的数组int长度为 10 6 的 C Dev C 我曾经在我的
  • 在 React Native 中使用 PanResponder 锁定移动

    使用本机反应泛响应器 https facebook github io react native docs panresponder html 当屏幕触摸坐标超出一定值范围时 如何阻止移动 例如 如何防止用户将组件移动到屏幕上某个 y 位置
  • 比较堆转储 (HPROF) 文件

    是否可以比较两个 HPROF 文件 如何 根据我的发现 您只能比较对象的直方图 为此 请转到 直方图 视图 然后单击 与另一个堆转储比较 并选择另一个 hprof 文件 Here is screenshot
  • 获取孩子的所有孩子等等

    我使用 MongoDb 作为数据库 我想要所有孩子的孩子等等 让我们假设 A 有 B 和 C 孩子 B 有 D 和 E 孩子 D 有 F 和 G 孩子 所以当我查询子节点时A 我将所有孩子作为输出 例如 B C D E F G C Cust
  • 检查一个数据帧的值是否按确切顺序存在于另一个数据帧中

    我有 1 个数据数据框和多个 参考 数据框 我正在尝试自动检查数据帧的值是否与参考数据帧的值匹配 重要的是 这些值的顺序也必须与参考数据帧中的值相同 这些列是重要的列 但我的真实数据集包含更多列 下面是一个玩具数据集 Dataframe g
  • 1个月后自动将列表数据从一个列表复制到另一个列表

    我列出了在提交信息路径表单后动态存储数据的列表 我想在任何数据创建日期 30 天后存档此数据 你能建议我该怎么做吗 看看我可以通过工作流程做到这一点 但我如何设置条件 在创建任何列表后 30 天完成后 它将自动复制到其他列表中 首先我想问为
  • 如何防止XSS攻击

    渗透测试团队告诉我 以下 URL 正在引发 XSS 攻击 这是我的 download msg jsp 代码
  • 存储过程参数默认值

    我正在尝试创建一个带有默认参数的存储过程 在我的查询中我会这样做 DECLARE mydate DATETIME DECLARE MT DATETIME DECLARE MY DATETIME SELECT mydate GETDATE S
  • 填充seaborn / matplotlib中两个正态分布之间的重叠区域

    我想填充两个正态分布之间重叠的区域 我有x最小值和最大值 但我不知道如何设置y边界 我看过plt文档 https matplotlib org gallery lines bars and markers fill between demo
  • 使用 mongo-cxx-driver 构建 C++ 项目时出现链接错误

    我目前正在开发一个C 需要使用的应用程序mongo cxx driver用于访问MongoDB实例 我尝试了几种安装方法 但每次都会遇到相同的链接器问题 最初 我尝试安装mongo cxx drivers and mongod c driv
  • aiohttp.TCPConnector (带有 limit 参数)与 asyncio.Semaphore 用于限制并发连接数

    我想我想通过制作一个简单的脚本来学习新的 python async wait 语法 更具体地说是 asyncio 模块 该脚本允许您一次下载多个资源 但现在我被困住了 在研究时 我发现了两种限制并发请求数量的选项 将 aiohttp TCP