具有固定大小 FIFO 队列的生产者/消费者模式

2023-12-22

我需要围绕固定大小的 FIFO 队列实现生产者/消费者模式。我thinkConcurrentQueue 周围的包装类可能适用于此,但我不完全确定(而且我以前从未使用过 ConcurrentQueue)。这样做的不同之处在于队列只需要保存固定数量的项目(在我的例子中是字符串)。我的应用程序将有一个生产者任务/线程和一个消费者任务/线程。当我的消费者任务运行时,它需要及时将队列中存在的所有项目出队并处理它们。

就其价值而言,我的消费者对排队项目的处理只不过是通过 SOAP 将它们上传到并非 100% 可靠的 Web 应用程序。如果无法建立连接或调用 SOAP 调用失败,我应该丢弃这些项目并返回队列以获取更多项目。由于 SOAP 的开销,我试图最大限度地增加队列中可以在一次 SOAP 调用中发送的项目数。

有时,我的生产者添加项目的速度可能比我的消费者删除和处理它们的速度快。如果队列已经满并且我的生产者需要添加另一个项目,我需要将新项目放入队列,然后将最旧的项目出队,以便队列的大小保持固定。基本上,我需要始终保留队列中生成的最新项目(即使这意味着某些项目不会被消耗,因为我的消费者当前正在处理以前的项目)。

关于生产者在队列中的项目固定的情况下保持数量,我从这个问题中发现了一个潜在的想法:

固定大小的队列,根据新的队列自动将旧值出队 https://stackoverflow.com/questions/5852863/fixed-size-queue-which-automatically-dequeues-old-values-upon-new-enques

我目前正在 ConcurrentQueue 周围使用包装类(基于该答案),并使用 Enqueue() 方法,如下所示:

public class FixedSizeQueue<T>
{
    readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    public int Size { get; private set; }

    public FixedSizeQueue(int size)
    {
        Size = size;
    }

    public void Enqueue(T obj)
    {
        // add item to the queue
        queue.Enqueue(obj);

        lock (this) // lock queue so that queue.Count is reliable
        {
            while (queue.Count > Size) // if queue count > max queue size, then dequeue an item
            {
                T objOut;
                queue.TryDequeue(out objOut);
            }
        }
    }
}

我创建了此类的一个实例,对队列有大小限制,如下所示:

FixedSizeQueue<string> incomingMessageQueue = new FixedSizeQueue<string>(10); // 10 item limit

我启动我的生产者任务,它开始填充队列。当添加项目导致队列计数超过最大大小时,我的 Enqueue() 方法中的代码似乎可以正常工作,从队列中删除最旧的项目。现在我需要我的消费者任务来使项目出队并处理它们,但这就是我的大脑感到困惑的地方。为我的消费者实现 Dequeue 方法的最佳方法是什么,该方法将及时拍摄队列快照并将所有项目出队以进行处理(在此过程中生产者可能仍在向队列添加项目)?


简单地说,ConcurrentQueue 有一个“ToArray”方法,当输入该方法时,将锁定集合并生成队列中所有当前项目的“快照”。如果您希望为您的消费者提供一块要处理的东西,您可以锁定入队方法具有的同一对象,调用 ToArray(),然后旋转一个while(!queue.IsEmpty) queue.TryDequeue(out trash)在返回您提取的数组之前循环清除队列。

这将是你的GetAll() method:

public T[] GetAll()
{
    lock (syncObj) // so that we don't clear items we didn't get with ToArray()
    {
        var result = queue.ToArray();
        T trash;
        while(!queue.IsEmpty) queue.TryDequeue(out trash);
    }
}

由于您必须清除队列,因此您可以简单地将这两个操作结合起来;创建一个适当大小的数组(使用queue.Count),然后当队列不为空时,将一个项目出队并将其放入数组中,然后再返回。

现在,这就是具体问题的答案。我现在必须凭良心戴上我的 CodeReview.SE 帽子并指出以下几点:

  • NEVER use lock(this)。你永远不知道哪些其他对象可能正在使用你的对象作为锁定焦点,因此当该对象从内部锁定自身时会被阻止。最佳实践是锁定一个私有范围的对象实例,通常是为了锁定而创建的实例:private readonly object syncObj = new object();

  • 由于无论如何你都会锁定包装器的关键部分,所以我会使用普通的List<T>而不是并发集合。访问速度更快,更容易清理,因此您将能够比 ConcurrentQueue 更简单地完成您正在做的事情。要排队,请锁定同步对象,在索引零之前插入(),然后使用RemoveRange()从索引Size到列表的当前Count中删除任何项目。要出列,请锁定同一同步对象,调用 myList.ToArray() (来自 Linq 命名空间;与 ConcurrentQueue 的作用几乎相同),然后在返回数组之前调用 myList.Clear()。再简单不过了:

    public class FixedSizeQueue<T>
    {
    private readonly List<T> queue = new List<T>();
    private readonly object syncObj = new object();
    
    public int Size { get; private set; }
    
    public FixedSizeQueue(int size) { Size = size; }
    
    public void Enqueue(T obj)
    {
        lock (syncObj)
        {
            queue.Insert(0,obj)
            if(queue.Count > Size) 
               queue.RemoveRange(Size, Count-Size);
        }
    }
    
    public T[] Dequeue()
    {
        lock (syncObj)
        {
            var result = queue.ToArray();
            queue.Clear();
            return result;
        }
    }
    }
    
  • 您似乎明白您正在使用此模型丢弃排队的项目。这通常不是一件好事,但我愿意给你一个无罪推论。不过,我想说有一种无损的方法可以实现这一点,即使用 BlockingCollection。 BlockingCollection 包装任何 IProducerConsumerCollection(包括大多数 System.Collections.Concurrent 类),并允许您指定队列的最大容量。然后,该集合将阻止任何尝试从空队列中出队的线程,或任何尝试添加到满队列的线程,直到添加或删除项目,以便有东西可以获取或有空间可以插入。这是实现具有最大大小的生产者-消费者队列的最佳方法,否则需要“轮询”以查看是否有消费者需要处理的内容。如果你走这条路,那么只有消费者必须扔掉的东西才会被扔掉;消费者将看到生产者放入的所有行,并对每一行做出自己的决定。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

具有固定大小 FIFO 队列的生产者/消费者模式 的相关文章

  • 转换 const void*

    我有一个函数返回一个const void 我想用它的信息作为char 我可以将它投射为 C 风格的罚款 char variable但是当我尝试使用reinterpret cast like reinterpret cast
  • 按扩展名过滤搜索文件返回太多结果

    我正在开发一个 C 控制台应用程序 它必须管理 Windows 操作系统上的文件 我需要获取具有特定扩展名的文件名 列表 我找到了很多解决方案 最建议的是以下一种 HANDLE hFind WIN32 FIND DATA data hFin
  • 前向声明类型和“已声明为类类型的非类类型”

    我对以下代码有问题 template
  • 未找到 Boost 库,但编译正常

    我正在尝试在 C 中使用 boost 的文件系统 使用时看起来编译没问题 c c Analyse c o Analyse o g W Wall L usr local lib lboost filesystem lboost system
  • 传递 constexpr 对象

    我决定给予新的C 14的定义constexpr旋转并充分利用它 我决定编写一个小的编译时字符串解析器 然而 我正在努力保持我的对象constexpr将其传递给函数时 考虑以下代码 include
  • 有些有助于理解“产量”

    在我不断追求少吸的过程中 我试图理解 产量 的说法 但我不断遇到同样的错误 someMethod 的主体不能是迭代器块 因为 System Collections Generic List 不是迭代器接口类型 这是我被卡住的代码 forea
  • 如何将 .txt 文件中的数据转换为 xml? C#

    我在一个文本文件中有数千行数据 我想通过将其转换为更容易搜索的内容来轻松搜索 我希望 XML 或其他类型的大型数据结构 尽管我不确定它是否是最好的对于我的想法 每行的数据如下所示 第 31 册 托马斯 乔治 32 34 154 每本书都不是
  • Task.Run 作为反模式?

    我正在将 SQLite NET PCL 库用于我的 WinRT 项目SQliteAsyncConnection类 它提供经典的异步版本SQLiteConnection方法 然而 就该项目而言Github页面 https github com
  • 不同 C++ 文件中的相同类名

    如果两个 C 文件具有相同名称的类的不同定义 那么当它们被编译和链接时 即使没有警告也会抛出一些东西 例如 a cc class Student public std string foo return A void foo a Stude
  • 假装 .NET 字符串是值类型

    在 NET 中 字符串是不可变的 并且是引用类型变量 这通常会让新的 NET 开发人员感到惊讶 因为他们的行为可能会将它们误认为是值类型对象 然而 除了使用实践StringBuilder对于长连接 尤其是 在循环中 在实践中是否有任何理由需
  • 如果输入被重定向则执行操作

    我想知道如果我的输入被重定向 我应该如何在 C 程序中执行操作 例如 假设我有已编译的程序 prog 并且我将输入 input txt 重定向到它 我这样做 prog lt input txt 我如何在代码中检测到这一点 一般来说 您无法判
  • 模板外部链接?谁能解释一下吗?

    模板名称具有链接 3 5 非成员函数模板可以有内部链接 任何其他模板名称应具有外部链接 从具有内部链接的模板生成的实体与在其他翻译单元中生成的所有实体不同 我知道使用关键字的外部链接 extern C EX extern C templat
  • 比较:接口方法、虚方法、抽象方法

    它们各自的优点和缺点是什么 接口方法 虚拟方法 抽象方法 什么时候应该选择什么 做出这一决定时应牢记哪些要点 虚拟和抽象几乎是一样的 虚方法在基类中有一个实现 可以选择重写 而抽象方法则没有 并且must在子类中被覆盖 否则它们是相同的 在
  • 使动态创建的链接标签在 Winforms 中可点击

    我正在制作一个程序 允许用户单击由动态链接标签创建的公司名称 在我想知道如何做到这一点之前 我从未在 C 中使用过链接标签 可为特定用户生成的业务数量各不相同 因此每个用户的链接标签数量并不相同 然后我想捕获业务 ID 以进行 Json 调
  • 如何解压 msgpack 文件?

    我正在将 msgpack 编码的数据写入文件 在编写时 我只是使用 C API 的 fbuffer 如 我为示例删除了所有错误处理 FILE fp fopen filename ab msgpack packer pk msgpack pa
  • WPF DataGrid / ListView 绑定到数组 mvvm

    我们假设你有 N 个整数的数组 表示行数的整数值 在模型中 该整数绑定到视图中的 ComboBox Q1 如何将数组 或数组的各个项目 绑定到 DataGrid 或 ListView 控件 以便 当您更改 ComboBox 值时 只有那么多
  • C++:二叉树所有节点值的总和

    我正在准备面试 我被一个二叉树问题困住了 我们如何计算二叉树所有节点中存在的值的总和 优雅的递归解决方案 伪代码 def sum node if node NULL return 0 return node gt value sum nod
  • 没有“对 *this”功能的右值引用的解决方法

    我有一个围绕可移动对象的代理容器类 并希望代理能够隐式生成对底层对象的右值引用 但仅当代理本身被移动时 我相信我将能够按照提案 n2439 实施此行为 将移动语义扩展到 this http www open std org jtc1 sc2
  • 为什么空循环使用如此多的处理器时间?

    如果我的代码中有一个空的 while 循环 例如 while true 它将把处理器的使用率提高到大约 25 但是 如果我执行以下操作 while true Sleep 1 它只会使用大约1 那么这是为什么呢 更新 感谢所有精彩的回复 但我
  • 当用户更改 Windows 中的语言键盘布局时如何通知?

    I want to show a message to user when the user changes the language keyboard layout of Windows for example from EN to FR

随机推荐