使用 Batchblock.Triggerbatch() 在 TPL 数据流管道中进行数据传播

2024-01-21

在我的生产者-消费者场景中,我有多个消费者,每个消费者都向外部硬件发送一个操作,这可能需要一些时间。我的管道看起来有点像这样:

BatchBlock --> TransformBlock --> BufferBlock --> (几个) ActionBlocks

我已将 ActionBlocks 的 BoundedCapacity 指定为 1。 理论上我想要的是,仅当我的一个操作块可用于操作时,我才想触发 Batchblock 将一组项目发送到 Transformblock。直到那时,Batchblock 应该只保留缓冲元素,而不是将它们传递到 Transformblock。我的批量大小是可变的。由于 Batchsize 是强制性的,我对 BatchBlock 批量大小确实有一个非常高的上限,但是我真的不希望达到该限制,我想根据执行上述任务的 Actionblocks 的可用性来触发我的批次。

我在 Triggerbatch() 方法的帮助下实现了这一点。我将 Batchblock.Triggerbatch() 称为 ActionBlock 中的最后一个操作。然而有趣的是,经过几天的正常工作后,管道出现了故障。经过检查,我发现有时批处理块的输入是在 ActionBlock 完成工作后才输入的。在这种情况下,ActionBlock 在其工作结束时实际上会调用 Triggerbatch,但是由于此时根本没有输入到 Batchblock,因此对 TriggerBatch 的调用是徒劳的。一段时间后,当输入流入 Batch 块时,就没有人可以调用 TriggerBatch 并重新启动 Pipeline。我一直在寻找可以检查 Batchblock 的输入缓冲区中是否确实存在某些内容的东西,但是没有这样的功能可用,我也找不到一种方法来检查 TriggerBatch 是否有效。

谁能建议一个可能的解决方案来解决我的问题。不幸的是,使用计时器来触发批次对我来说不是一个选择。除了 Pipeline 的启动之外,节流应该仅由 ActionBlock 之一的可用性来控制。

示例代码在这里:

    static BatchBlock<int> _groupReadTags;

    static void Main(string[] args)
    {
        _groupReadTags = new BatchBlock<int>(1000);

        var bufferOptions = new DataflowBlockOptions{BoundedCapacity = 2};
        BufferBlock<int> _frameBuffer = new BufferBlock<int>(bufferOptions);
        var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1};
        int batchNo = 1;


        TransformBlock<int[], int> _workingBlock = new TransformBlock<int[], int>(list =>
        {

            Console.WriteLine("\n\nWorking on Batch Number {0}", batchNo);
            //_groupReadTags.TriggerBatch();
            int sum = 0;

            foreach (int item in list)
            {
                Console.WriteLine("Elements in batch {0} :: {1}", batchNo, item);
                sum += item;

            }
            batchNo++;
            return sum;

        });

            ActionBlock<int> _worker1 = new ActionBlock<int>(async x =>
            {
                Console.WriteLine("Number from ONE :{0}",x);
                await Task.Delay(500);

                    Console.WriteLine("BatchBlock Output Count : {0}", _groupReadTags.OutputCount);

                _groupReadTags.TriggerBatch();



        },consumerOptions);

        ActionBlock<int> _worker2 = new ActionBlock<int>(async x =>
        {
            Console.WriteLine("Number from TWO :{0}", x);
            await Task.Delay(2000);
            _groupReadTags.TriggerBatch();

        }, consumerOptions);

        _groupReadTags.LinkTo(_workingBlock);
        _workingBlock.LinkTo(_frameBuffer);
        _frameBuffer.LinkTo(_worker1);
        _frameBuffer.LinkTo(_worker2);

        _groupReadTags.Post(10);
        _groupReadTags.Post(20);
        _groupReadTags.TriggerBatch();

        Task postingTask = new Task(() => PostStuff());
        postingTask.Start();
        Console.ReadLine();

    }



    static void PostStuff()
    {


        for (int i = 0; i < 10; i++)
            {
                _groupReadTags.Post(i);
                Thread.Sleep(100);
            }

        Parallel.Invoke(
            () => _groupReadTags.Post(100),
            () => _groupReadTags.Post(200),
            () => _groupReadTags.Post(300),
            () => _groupReadTags.Post(400),
            () => _groupReadTags.Post(500),
            () => _groupReadTags.Post(600),
            () => _groupReadTags.Post(700),
            () => _groupReadTags.Post(800)
                       );
    }

这是一个替代方案BatchBlock https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.batchblock-1具有一些额外功能的实现。它包括一个TriggerBatch具有此签名的方法:

public int TriggerBatch(int nextMinBatchSizeIfEmpty);

如果输入队列不为空,调用此方法将立即触发批处理,否则将设置临时MinBatchSize这只会影响下一批。您可以使用较小的值来调用此方法nextMinBatchSizeIfEmpty确保在当前无法生产批次的情况下,下一个批次将比配置的时间更早发生BatchSize在块的构造函数中。

此方法返回生产批次的大小。它返回0如果输入队列为空,或者输出队列已满,或者块已完成。

public class BatchBlockEx<T> : ITargetBlock<T>, ISourceBlock<T[]>
{
    private readonly ITargetBlock<T> _input;
    private readonly IPropagatorBlock<T[], T[]> _output;
    private readonly Queue<T> _queue;
    private readonly object _locker = new object();
    private int _nextMinBatchSize = Int32.MaxValue;

    public Task Completion { get; }
    public int InputCount { get { lock (_locker) return _queue.Count; } }
    public int OutputCount => ((BufferBlock<T[]>)_output).Count;
    public int BatchSize { get; }

    public BatchBlockEx(int batchSize, DataflowBlockOptions dataflowBlockOptions = null)
    {
        if (batchSize < 1) throw new ArgumentOutOfRangeException(nameof(batchSize));
        dataflowBlockOptions = dataflowBlockOptions ?? new DataflowBlockOptions();
        if (dataflowBlockOptions.BoundedCapacity != DataflowBlockOptions.Unbounded &&
            dataflowBlockOptions.BoundedCapacity < batchSize)
            throw new ArgumentOutOfRangeException(nameof(batchSize),
            "Number must be no greater than the value specified in BoundedCapacity.");

        this.BatchSize = batchSize;

        _output = new BufferBlock<T[]>(dataflowBlockOptions);

        _queue = new Queue<T>(batchSize);

        _input = new ActionBlock<T>(async item =>
        {
            T[] batch = null;
            lock (_locker)
            {
                _queue.Enqueue(item);
                if (_queue.Count == batchSize || _queue.Count >= _nextMinBatchSize)
                {
                    batch = _queue.ToArray(); _queue.Clear();
                    _nextMinBatchSize = Int32.MaxValue;
                }
            }
            if (batch != null) await _output.SendAsync(batch).ConfigureAwait(false);

        }, new ExecutionDataflowBlockOptions()
        {
            BoundedCapacity = 1,
            CancellationToken = dataflowBlockOptions.CancellationToken
        });

        var inputContinuation = _input.Completion.ContinueWith(async t =>
        {
            try
            {
                T[] batch = null;
                lock (_locker)
                {
                    if (_queue.Count > 0)
                    {
                        batch = _queue.ToArray(); _queue.Clear();
                    }
                }
                if (batch != null) await _output.SendAsync(batch).ConfigureAwait(false);
            }
            finally
            {
                if (t.IsFaulted)
                {
                    _output.Fault(t.Exception.InnerException);
                }
                else
                {
                    _output.Complete();
                }
            }
        }, TaskScheduler.Default).Unwrap();

        this.Completion = Task.WhenAll(inputContinuation, _output.Completion);
    }

    public void Complete() => _input.Complete();
    void IDataflowBlock.Fault(Exception ex) => _input.Fault(ex);

    public int TriggerBatch(Func<T[], bool> condition, int nextMinBatchSizeIfEmpty)
    {
        if (nextMinBatchSizeIfEmpty < 1)
            throw new ArgumentOutOfRangeException(nameof(nextMinBatchSizeIfEmpty));
        int count = 0;
        lock (_locker)
        {
            if (_queue.Count > 0)
            {
                T[] batch = _queue.ToArray();
                if (condition == null || condition(batch))
                {
                    bool accepted = _output.Post(batch);
                    if (accepted) { _queue.Clear(); count = batch.Length; }
                }
                _nextMinBatchSize = Int32.MaxValue;
            }
            else
            {
                _nextMinBatchSize = nextMinBatchSizeIfEmpty;
            }
        }
        return count;
    }

    public int TriggerBatch(Func<T[], bool> condition)
        => TriggerBatch(condition, Int32.MaxValue);

    public int TriggerBatch(int nextMinBatchSizeIfEmpty)
        => TriggerBatch(null, nextMinBatchSizeIfEmpty);

    public int TriggerBatch() => TriggerBatch(null, Int32.MaxValue);

    DataflowMessageStatus ITargetBlock<T>.OfferMessage(
        DataflowMessageHeader messageHeader, T messageValue,
        ISourceBlock<T> source, bool consumeToAccept)
    {
        return _input.OfferMessage(messageHeader, messageValue, source,
            consumeToAccept);
    }

    T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target, out bool messageConsumed)
    {
        return _output.ConsumeMessage(messageHeader, target, out messageConsumed);
    }

    bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target)
    {
        return _output.ReserveMessage(messageHeader, target);
    }

    void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target)
    {
        _output.ReleaseReservation(messageHeader, target);
    }

    IDisposable ISourceBlock<T[]>.LinkTo(ITargetBlock<T[]> target,
        DataflowLinkOptions linkOptions)
    {
        return _output.LinkTo(target, linkOptions);
    }
}

另一个超载TriggerBatch方法允许检查当前可以生产的批次,并决定是否应该触发它:

public int TriggerBatch(Func<T[], bool> condition);

The BatchBlockEx类不支持Greedy https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.groupingdataflowblockoptions.greedy and MaxNumberOfGroups https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.groupingdataflowblockoptions.maxnumberofgroups内置选项BatchBlock.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 Batchblock.Triggerbatch() 在 TPL 数据流管道中进行数据传播 的相关文章

  • 在 C# 中转换 VbScript 函数(Right、Len、IsNumeric、CInt)

    同样 我在 VbScript 中得到了以下代码 您能建议一下 C 中的等效代码吗 Function GetNavID Title getNavID UCase Left Title InStr Title 1 End Function 我已
  • 如何从更高级别启动用户级别的 Exe

    我希望一个进程始终在用户级别运行 当它由以管理员级别运行的安装程序 自定义 而不是 msi 启动时 或者当用户登录时 环顾四周 我不确定这是否可能 最简单的方法是有 2 个进程 一种是普通用户 它启动提升 管理进程 然后管理进程可以使用 I
  • Excel的解析路径

    其实我想问以下问题 对于位于 目录中定义的 PATH 怎么能 我找出这些目录中的哪个 找到了 因为我需要使用 Process Run 从 C 运行 Excel 并且只需指示 Excel 即可正常工作 Windows 似乎知道在哪里可以找到它
  • C++:Linux平台上的线程同步场景

    我正在为 Linux 平台实现多线程 C 程序 其中我需要类似于 WaitForMultipleObjects 的功能 在搜索解决方案时 我发现有一些文章描述了如何在 Linux 中实现 WaitForMultipleObjects 功能
  • C++11 中具有 C 链接的复杂类型

    我需要将 C 库的标头包含到我的 C 11 代码中 现在 标头提供了涉及大量的例程和数据结构double complex到处都是 例如 include
  • 使用 strcpy 从整数生成指针,无需进行强制转换

    我不明白我做错了什么 我正在学习 C 很抱歉 如果这显然是错误的 但我正在尝试使用uthash http uthash sourceforge net 制作股票及其价格的哈希图 但是当我将股票添加到哈希映射时 我收到上述错误 我所做的就是从
  • 使用 C# 和反射打印完整的对象图

    我有一个复杂的对象 class A int Field1 int field2 property ClassB ClassB property classC classC etc etc 我想使用反射打印完整的对象图 有什么好的代码吗 一种
  • 通过 TCP/.NET SSLStream 发送文件很慢/无法正常工作

    我正在编写一个与 SSL 配合使用的服务器 客户端应用程序 通过SSLStream 它必须做很多事情 不仅仅是文件接收 发送 目前 它的工作原理是 只有一个连接 我总是使用从客户端 服务器发送数据SSLStream WriteLine 并使
  • 如何在Qt3D中优化点云渲染

    我正在尝试使用 Qt3D 显示大型点云 20M pts 我第一次发现这个图书馆https github com MASKOR Qt3DPointcloudRenderer https github com MASKOR Qt3DPointc
  • DataGridView小数不排序

    好吧 我有一个 DataGridView 它的数据绑定如下 dataGridViewChartOre AutoGenerateColumns false dataGridViewChartOre DataSource xml GetOreC
  • C# While 循环与 For 循环?

    在 C 中 一个问题已经困扰我一段时间了 它的 While 和 For 循环之间的实际主要区别是什么 它只是纯粹的可读性吗 在 for 循环中本质上可以做的所有事情都可以在 while 循环中完成 只是在不同的地方 举这些例子 int nu
  • 当一种语言是另一种语言的平行超集时,这意味着什么?

    我正在阅读关于实时并发 C 的期刊文章 http link springer com article 10 1007 2FBF00365999 并且它在摘要中提到 因此你们中的任何人都可以通过该链接查看上下文 Concurrent C 是
  • 在 Ubuntu 16.04 上编译 PCL 1.7,CMake 生成的 Makefile 中出现错误

    我正在尝试让 PCL 1 7 点云库 而不是其他 pcl 在 Ubuntu 16 04 上运行 我最终希望用于 C 的东西 但现在我只是想让这些例子工作 我使用的是 Ubuntu GNU 5 3 1 附带的默认编译器和 Cmake 版本 3
  • 初始化二维数组时出现分段错误

    我已经检查过我的代码是否正确地划分了内存空间 但是一旦我尝试将 2D 数组初始化为某些值 然后对这些值求和 我就会在 2x2 数组上收到分段错误 我想最终将我的代码扩展到更大的数组 但我什至无法让它在这里工作 我知道有很多关于 malloc
  • AllowUserToAddRows 不适用于 DataGridView 上的 List<> 数据源

    我有一个DataGridView与DataSource set to List
  • 在 C# 中设置风扇速度

    我知道以前有人问过这个问题 但我似乎无法让它发挥作用 我已调用以下内容 using System Management using System Management Instrumentation using System Runtime
  • QT C++ QRegularExpression 多个匹配

    我想使用正则表达式从 QString html 中提取信息 我明确想使用正则表达式 无解析器解决方案 和类Q正则表达式 http qt project org doc qt 5 0 qtcore qregularexpression htm
  • C++11 中引入了哪些重大更改?

    我知道 C 11 中至少有一项更改会导致一些旧代码停止编译 引入explicit operator bool 在标准库中 替换旧实例operator void 诚然 这将破坏的代码可能是一开始就不应该有效的代码 但它仍然是一个破坏性的变化
  • 创建进程的多个子进程并维护所有 PID 的共享数组

    我已经分叉了几次 并用 C 创建了一堆子进程 我想将它们所有的 PID 存储在一个共享数组中 PID 的顺序并不重要 例如 我创建了 32 个进程 我想要一个 32 个整数长的数组来存储每个 PID 并且每个进程都可以访问 最好的方法是什么
  • 如何在 C# 中将 json 转换为平面结构

    我正在尝试用 C 编写函数 将 JSON 转换为键 值对 它应该支持数组 例如下面的 JSON title title value components component id id1 menu title menu title1 tit

随机推荐