分区:如何在每个分区后添加等待

2023-12-08

我有一个每分钟接受 20 个请求的 API,之后我需要等待 1 分钟才能查询它。我有一个项目列表(通常超过 1000 个),我需要从 API 查询其详细信息,我的想法是我可以使用Partitioner将我的列表分为 20 个项目/请求,但很快我意识到Partitioner不是这样工作的,我的第二个想法是添加一个delay在分区中,但这也是一个坏主意,根据我的理解,它会在每个不需要的请求之后添加延迟,相反,我需要在每个请求之后延迟Partition。下面是我的代码:

public static async Task<IEnumerable<V>> ForEachAsync<T, V>(this IEnumerable<T> source,
    int degreeOfParallelism, Func<T, Task<V>> body, CancellationToken token,
    [Optional] int delay)
{
    var whenAll = await Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
        select Task.Run(async delegate {
            var allResponses = new List<V>();
            using (partition)
                while (partition.MoveNext())
                {
                    allResponses.Add(await body(partition.Current));
                    await Task.Delay(TimeSpan.FromSeconds(delay));
                }
            return allResponses;
        }, token));
    return whenAll.SelectMany(x => x);
}

有谁知道我怎样才能做到这一点?


这里有一个RateLimiter您可以使用该类来限制异步操作​​的频率。这是一个更简单的实现RateLimiter找到的类this answer.

/// <summary>
/// Limits the number of workers that can access a resource, during the specified
/// time span.
/// </summary>
public class RateLimiter
{
    private readonly SemaphoreSlim _semaphore;
    private readonly TimeSpan _timeUnit;

    public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
    {
        if (maxActionsPerTimeUnit < 1)
            throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));
        if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > Int32.MaxValue)
            throw new ArgumentOutOfRangeException(nameof(timeUnit));
        _semaphore = new SemaphoreSlim(maxActionsPerTimeUnit, maxActionsPerTimeUnit);
        _timeUnit = timeUnit;
    }

    public async Task WaitAsync(CancellationToken cancellationToken = default)
    {
        await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
        // Schedule the release of the semaphore using a Timer.
        // Use the newly created Timer object as the state object, to prevent GC.
        // Handle the unlikely case that the _timeUnit is invalid.
        System.Threading.Timer timer = new(_ => _semaphore.Release());
        try { timer.Change(_timeUnit, Timeout.InfiniteTimeSpan); }
        catch { _semaphore.Release(); throw; }
    }
}

使用示例:

List<string> urls = GetUrls();

using var rateLimiter = new RateLimiter(20, TimeSpan.FromMinutes(1.0));

string[] documents = await Task.WhenAll(urls.Select(async url =>
{
    await rateLimiter.WaitAsync();
    return await _httpClient.GetStringAsync(url);
}));

在线演示.

The Timer是用这个特定的构造函数以防止它在触发之前被垃圾收集,如中所述这个答案作者:尼克·H.

Note:这个实现有点泄漏,因为它创建了内部一次性的System.Threading.Timer对象,当您使用完后不会被处置RateLimiter。任何活动的计时器都会阻止RateLimiter从被垃圾收集直到这些计时器触发它们的回调。还有SemaphoreSlim未处置正如它应该。这些都是小缺陷,不太可能影响仅创建少数内容的程序RateLimiters。如果您打算创建很多,您可以看看第三次修订这个答案的特点是一次性RateLimiter基于Task.Delay method.


这是一个替代实现RateLimiter类,更复杂,它基于Environment.TickCount64属性而不是SemaphoreSlim。它的优点是不会在后台创建“即发即忘”计时器。缺点是WaitAsync方法不支持CancellationToken争论,并且由于复杂性,出现错误的可能性更高。

public class RateLimiter
{
    private readonly Queue<long> _queue;
    private readonly int _maxActionsPerTimeUnit;
    private readonly int _timeUnitMilliseconds;

    public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
    {
        // Arguments validation omitted
        _queue = new Queue<long>();
        _maxActionsPerTimeUnit = maxActionsPerTimeUnit;
        _timeUnitMilliseconds = checked((int)timeUnit.TotalMilliseconds);
    }

    public Task WaitAsync()
    {
        int delayMilliseconds = 0;
        lock (_queue)
        {
            long currentTimestamp = Environment.TickCount64;
            while (_queue.Count > 0 && _queue.Peek() < currentTimestamp)
            {
                _queue.Dequeue();
            }
            if (_queue.Count >= _maxActionsPerTimeUnit)
            {
                long refTimestamp = _queue
                    .Skip(_queue.Count - _maxActionsPerTimeUnit).First();
                delayMilliseconds = checked((int)(refTimestamp - currentTimestamp));
                Debug.Assert(delayMilliseconds >= 0);
                if (delayMilliseconds < 0) delayMilliseconds = 0; // Just in case
            }
            _queue.Enqueue(currentTimestamp + delayMilliseconds
                + _timeUnitMilliseconds);
        }
        if (delayMilliseconds == 0) return Task.CompletedTask;
        return Task.Delay(delayMilliseconds);
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

分区:如何在每个分区后添加等待 的相关文章

随机推荐

  • C++类成员函数指针指向函数指针

    我使用 luabind 作为我的 lua 到 C 包装器 Luabind提供了一种使用我自己的回调函数来处理lua抛出的异常的方法 set pcall callback 因此 我解释了文档中的一个示例 更改是 logger gt log 函
  • hibernate envers:合并和 saveOrUpdate

    我正在开发 spring hibernate envers 应用程序 经过大量谷歌搜索后 事情终于对我有用 但我仍然有几个问题 早些时候我正在使用saveOrUpdate为了 保存或更新实体 但 当与恩弗斯一起工作时 它是 扔一个nonUn
  • Windows 应用程序中 DataGridView 中的货币格式

    我无法在 DataGridView 上显示货币格式 你们能看一下这段代码吗 private void dataGridView1 DataBindingComplete object sender DataGridViewBindingCo
  • R 函数(如 str()、summary() 和 head())的 Python pandas 等价物是什么?

    我只知道describe 功能 还有其他类似的功能吗str summary and head 在熊猫中info 方法创建与 R 非常相似的输出str gt str train data frame 891 obs of 13 variabl
  • 单击时循环遍历数组

    我想知道如何在单击时循环遍历数组中的值 当显示数组的最后一个值时 下一次单击应再次显示数组的第一个值 我认为我已经很接近了 但是当我到达数组的最后一个值时 我必须单击两次才能再次显示第一个值 这是我的 JavaScript var myAr
  • PHP - 使用explode()函数将值分配给关联数组

    我想分解一个字符串 但结果数组具有特定的字符串作为键而不是整数 IE 如果我有一个字符串 Joe Bloggs 我想将其分解 以便我有一个关联数组 例如 arr first name Joe arr last name Bloggs 目前
  • 在 vscode 中安装 ionide-fsharp 时出现错误“未找到中央目录记录签名结尾”

    我已经安装了 VS Code 版本 1 8 1 机器是Windows 7 64位 安装 ionide fsharp 扩展时 出现错误 未找到中央目录记录签名末尾 VS Code 的 1 7 2 版本似乎可以工作 但是这个问题似乎在 1 8
  • android 对话框上的轮式选择器

    我想从旋转轮获取文本作为密码 您能给我任何用于从中获取文本的旋转轮的示例吗 我没有得到任何好的例子 提前致谢 我试图得到这个旋转轮 最后我已经构建了我的轮子来从用户那里获取文本 我给出了四个整数的例子 package com example
  • 在ggplot2中使用facet_grid()函数时,如何使用labeller()函数让列总计出现在facet的标签中

    这是一个数据集 可以为我的问题提供背景信息 library tidyr library dplyr library ggplot2 set seed 1 dfr2 lt tibble x1 factor sample letters 1 3
  • 如何在 BitmapFactory 中保持图像质量相同

    我已将位图图像转换为字符串以保存它 Bitmap photo extras getParcelable data ByteArrayOutputStream baos new ByteArrayOutputStream photo comp
  • 将许多子目录拆分为一个新的、单独的 Git 存储库

    这个问题与 将许多子目录分离到新的单独的 git 存储库中 Git 子树和多个目录 我不想分离单个子目录 而是想分离几个子目录 例如 这是我的文件夹结构 app1 file1 file2 folder1 folder2 app2 file3
  • 来自输入文件的动态数组

    我是初学者 所以如果这确实是一个愚蠢的问题 我很抱歉 我的任务是从输入文件中打印出动态数组 我尝试用谷歌搜索它 发现了一些类似的问题 但答案都是 使用向量 等 但我们还没有学到这些 还说必须使用函数 这就是我想出的 include
  • 如何从 PL/pgSQL 写入磁盘上的文件?

    我想做相当于 c 或 php fopen 和 fwrite 的操作 我不想将表转储到磁盘 我正在尝试在开发过程中进行一些调试日志记录 您可以在 postgres 函数中使用 plpythonu f open f write f close
  • Bootstrap 下拉菜单隐藏在模式中

    您好 我正在尝试获取引导下拉列表以显示模型内的列表 我想我要说的是 当我单击下拉菜单时 它会展开 但如果列表比模型长 它将切断列表的其余部分 导致用户无法选择所有选项 我一直在谷歌搜索并看到这篇文章点击这里这与我的问题非常相似 然而 他们说
  • Pandas 使用正则表达式分隔符读取 csv

    我一直在尝试读取这样的自定义 csv 文件 6 Rotterdam NLD Zuid Holland 593321 19 Zaanstad NLD Noord Holland 135621 214 Porto Alegre BRA Rio
  • java 类型推断是如何工作的?

    有人可以解释一下以下语法是如何工作的吗 public static
  • 有关 PHP 中网络爬虫的错误

    我正在尝试使用 PHP 创建一个简单的网络爬虫 它能够爬行 edu 域 并提供父级的种子 url 我使用了简单的html dom来实现爬虫 而一些核心逻辑是我自己实现的 我将发布下面的代码并尝试解释这些问题 private function
  • ios 在“确认您的应用内购买”对话框中显示不同的价格

    为 iOS 应用添加应用内购买 我可以成功购买该产品 但 确认您的应用内购买 对话框始终显示与我格式化的本地化价格相比的额外价格 示例 产品的本地化价格为 39 99 欧元 但在确认对话框中 产品的价格增加到 41 73 欧元 预先感谢您提
  • Maven的pom.xml中的pluginManagement是什么?

    这是我的 pom 文件的一个片段
  • 分区:如何在每个分区后添加等待

    我有一个每分钟接受 20 个请求的 API 之后我需要等待 1 分钟才能查询它 我有一个项目列表 通常超过 1000 个 我需要从 API 查询其详细信息 我的想法是我可以使用Partitioner将我的列表分为 20 个项目 请求 但很快