并行调用 API,每分钟有硬性限制

2023-12-30

我正在尝试对 API 进行并行调用。在停止之前,API 的调用限制为每分钟 1,200 次。在低于限制的情况下异步最有效的方法是什么?

def remove_html_tags(text):
    """Remove html tags from a string"""
    import re
    clean = re.compile('<.*?>')
    return re.sub(clean, ' ', text)

async def getRez(df, url):
async with aiohttp.ClientSession() as session:
        auth = aiohttp.BasicAuth('username',pwd)


        r = await session.get(url, auth=auth)


        if r.status == 200:
            content = await r.text()
            text = remove_html_tags(str(content))

        else:
            text = '500 Server Error'
        df.loc[df['url'] == url, ['RezText']] = [[text]]
        df['wordCount'] = df['RezText'].apply(lambda x: len(str(x).split(" ")))
        data = df[df["RezText"] != "500 Server Error"]


async def main(df):
    df['RezText'] = None
    await asyncio.gather(*[getRez(df, url) for url in df['url']])

loop = asyncio.get_event_loop()
loop.run_until_complete(main(data))

1200每分钟呼叫次数等于20每秒调用一次,这样您就可以将请求分成batches20 并在批次之间休眠一秒钟。

另一种选择是使用aiohttp.TCPConnector(limit=20)对于客户端会话,但这仅限制了数量并发请求,因此您最终可能会执行更多请求(如果 API 响应速度快于一秒)或更少请求(如果 API 响应速度慢于一秒);看this https://stackoverflow.com/questions/61774125/aiohttp-set-number-of-requests-per-second相关问题。

批量示例:

# python 3.7+
import aiohttp
import asyncio

async def fetch(session, url):
    data = None
    async with session.get(url) as response:
        if response.status != 200:
            text = await response.text()
            print("cannot retrieve %s: status: %d, reason: %s" % (url, response.status, text))
        else :
            data = await response.json()
    return data

async def main(n):
    print("starting")
    session = aiohttp.ClientSession()
    tasks = []
    batch = []
    for i in range(n):
        batch.append("http://httpbin.org/anything?key=a%d" % i)
        if len(batch) >= 20:
            print("issuing batch %d:%d" % (i-20+1, i+1))
            for url in batch:
                task = asyncio.create_task(fetch(session, url))
                tasks.append(task)
            batch = []
            await asyncio.sleep(1)
    if batch:  # if batch length does not divide n evenly consume last batch
        print("issuing last batch %d:%d" % (n-len(batch), n))
        for url in batch:
            task = asyncio.create_task(fetch(session, url))
            tasks.append(fetch(session, url))
    responses = await asyncio.gather(*tasks, return_exceptions=True)
    await session.close()
    for response in responses:
        assert "args" in response
        # note that the responses will be in the order in which the requests were made
    print("finished")

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(111))

Output

starting
issuing batch 0:20
issuing batch 20:40
issuing batch 40:60
issuing batch 60:80
issuing batch 80:100
issuing last batch 100:111
finished

这里重要的一点是asyncio.create_task(创建一个任务并启动它,返回一个任务对象),await asyncio.sleep(1)(用于限制请求)和await asyncio.gather(等待所有任务完成运行)。
对于 Python asyncio.ensure_future代替asyncio.create_task.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

并行调用 API,每分钟有硬性限制 的相关文章

随机推荐

  • 如何在 SQLite 中插入具有唯一 ID 的重复行?

    这看起来很简单 我想在 SQLite 表中复制一行 INSERT INTO table SELECT FROM table WHERE rowId 5 如果没有显式的唯一列声明 则该语句将起作用 但声明了表的第一列rowID INTEGER
  • 使用ActionCable,多种识别方式

    我使用 ActionCable 开发 Ruby on Rails 5 1 应用程序 通过 Devise 进行用户身份验证 https rubytutorial io actioncable devise authentication 适用于
  • Intellij 调试 Docker 容器不断给我 IO 异常握手失败

    我正在尝试在 Intellij v2020 1 中设置远程调试器 但不断收到以下错误 无法打开调试器端口 localhost 5005 java io IOException 握手失败 连接过早关闭 在我的 docker compose 文
  • 是否可以在cmake中不生成ALL_BUILD项目?

    我不需要 ALL BUILD 子项目 我可以避免生成它吗 谢谢 CMake 问题 16979 正在生成 ALL BUILD 目标 https gitlab kitware com cmake cmake issues 16979 The A
  • Facebook Graph API gem

    我想在我的 Rails 应用程序中使用 Facebook 图形 API 与 FB 图形 API 一起使用的推荐 gem 是什么 Thanks 这是一个写得很好的 FB 图形支持 Ruby https github com nov fb gr
  • 在 SSRS 中复制并粘贴表 (tablix)

    我有一个包含一天数据的 tablix 我需要在底部有相同的数据 但在 3 个不同的行中 我想在底部复制主要日期表 3 次 然后为每个表使用不同的数据集 我尝试复制 tablix 并将其粘贴到Body在 tablix 下方 但出现以下错误 报
  • C#:将数组分配给另一个数组:复制还是指针交换?

    抱歉问这个问题 我一直在谷歌上搜索了一下 但似乎出现的是对克隆或复制方法的引用 而不是我的问题的实际答案C 我有两个字节数组 它们正在被两个线程访问 private byte buffer1 new byte size private by
  • 如何使用 python 读取 CSV 文件时跳过空白行

    这是我的代码 我可以打印每一行 但是当出现空白行时 它会打印 由于CSV文件格式 所以我想在出现空行时跳过 import csv import time ifile open C Users BKA4ABT Desktop Test Spe
  • 如何将输出的 Fortran 二进制 NxNxN 矩阵读入 Python

    我用 Fortran 写出了一个矩阵 如下所示 real kind kind 0 0d0 dimension 256 256 256 dense CALCULATION inquire iolength reclen dense open
  • 如何在不不断扫描的情况下检测目录或文件何时发生更改

    除了读取所有文件并将它们与以前的快照进行比较之外 有没有办法在 Windows 中的 C 中检测目录何时发生更改 如果需要的话 我不介意 PInvoke EDITFileSystemWatcher 类很棒 但有一个问题是您必须启动后台任务
  • 一对一字段 Django 管理员

    编辑为使用一对一字段 我想将建筑物的面积添加到 django modeladmin 中 表结构是 class Area models Model id models IntegerField Buildings db column id a
  • Kubernetes Nginx Ingress 删除部分 URL

    我正在 Kubernetes 在 AKS 上 部署一个简单的应用程序 该应用程序位于使用 Nginx 的 Ingress 后面 并使用 Nginx helm 图表进行部署 我遇到一个问题 由于某种原因 Nginx 似乎没有将完整的 URL
  • AWS lambda读取zip文件执行验证并解压到s3存储桶(如果验证通过)

    我有一个 zip 文件到达 s3 存储桶的要求 我需要使用 python 编写一个 lambda 来读取 zip 文件 执行一些验证并在另一个 S3 存储桶上解压缩 Zip 文件包含以下内容 a csv b csv c csv trigge
  • 读取 HttpPost 响应

    我使用此代码向 http 服务器发送请求 HttpClient client new DefaultHttpClient HttpPost post new HttpPost http 192 168 0 1 test php HttpRe
  • 一步一步 oAuth Rest C# winform 示例 [关闭]

    很难说出这里问的是什么 这个问题是含糊的 模糊的 不完整的 过于宽泛的或修辞性的 无法以目前的形式得到合理的回答 如需帮助澄清此问题以便重新打开 访问帮助中心 help reopen questions 我已经尝试了一段时间了 需要从头开始
  • XML RPC - 从 C# 调用 python 函数

    我正在使用库xml rpc net 2 5 0用于在 C 中创建调用一些 python 方法的 XML RPC 客户端 客户端位于Windows7机器中 服务器位于运行red hat的VMWare中 调用python函数的客户端代码 mai
  • 在另一个类中使用实例方法作为装饰器

    我正在尝试创建一个类 MySerial 实例化一个串行对象 以便我可以写入 读取串行设备 UART 有一个实例方法是一个装饰器 它包装了属于完全不同的类的函数 App 因此装饰器负责写入和读取串行缓冲区 如果我创建一个实例MySerial在
  • 如何在apache solr中以原始嵌套形式检索json?

    我在用阿帕奇 索尔 for 文本搜索 我有nested document structure 这是one json file id 1 info first name John last name Doe gender male 我创建了一
  • 将项目与 Eclipse 中的 Sonar 关联

    我有 Eclipse Indigo 3 7 基础和 MyEclipse 10 Java 企业开发插件 我已经为eclipse安装了Sonar插件 安装 Sonar 服务器并从本地主机 localhost 9000 本地运行它 在 Eclip
  • 并行调用 API,每分钟有硬性限制

    我正在尝试对 API 进行并行调用 在停止之前 API 的调用限制为每分钟 1 200 次 在低于限制的情况下异步最有效的方法是什么 def remove html tags text Remove html tags from a str