原子地从 ConcurrentQueue 中获取所有内容

2024-04-30

我有多个线程生成项目并将它们粘贴在一个公共的ConcurrentQueue:

private ConcurrentQueue<GeneratedItem> queuedItems = new ConcurrentQueue<GeneratedItem>();

private void BunchOfThreads () {
    // ...
    queuedItems.Enqueue(new GeneratedItem(...));
    // ...
}

我有另一个单一消费者线程,但它需要在此应用程序的上下文中工作的方式是,偶尔,它只需要抓取一切当前位于线程队列中,将其从该队列中删除,这一切都是一次性完成的。就像是:

private Queue<GeneratedItem> GetAllNewItems () {

    return queuedItems.TakeEverything(); // <-- not a real method

}

我想我浏览了所有文档(针对集合及其实现的接口),但我似乎没有找到类似“同时从队列中获取所有对象”,甚至“同时与另一个队列交换内容”之类的内容。

如果我放弃的话我可以做到这一点没有问题ConcurrentQueue并且只是保护一个正常的Queue with a lock, 像这样:

private Queue<GeneratedItem> queuedItems = new Queue<GeneratedItem>();

private void BunchOfThreads () {
    // ...
    lock (queuedItems) {
        queuedItems.Enqueue(new GeneratedItem(...));
    }
    // ...
}

private Queue<GeneratedItem> GetAllNewItems () {

    lock (queuedItems) {
        Queue<GeneratedItem> newItems = new Queue<Event>(queuedItems);
        queuedItems.Clear();
        return newItems;
    }

}

但是,我喜欢它的便利性ConcurrentQueue而且由于我刚刚学习 C#,我对 API 很好奇;所以我的问题是,有没有办法对并发集合之一执行此操作?

是否有某种方法可以访问任何同步对象ConcurrentQueue为了我自己的目的而使用并锁定它,以便一切都能很好地协同工作?然后我可以锁定它,拿走所有东西,然后释放?


这取决于你想做什么。根据评论中的源代码 https://referencesource.microsoft.com/mscorlib/R/18bcbcbdddbcfdcb.html

//number of snapshot takers, GetEnumerator(), ToList() and ToArray() operations take snapshot.

这是通过内部调用来实现的ToList() https://referencesource.microsoft.com/mscorlib/R/c8a173f18e484abb.html这反过来又适用于m_numSnapshotTakers and a spin机制

/// Copies the <see cref="ConcurrentQueue{T}"/> elements to a new <see
/// cref="T:System.Collections.Generic.List{T}"/>.
/// </summary>
/// <returns>A new <see cref="T:System.Collections.Generic.List{T}"/> containing a snapshot of
/// elements copied from the <see cref="ConcurrentQueue{T}"/>.</returns>
private List<T> ToList()
{
   // Increments the number of active snapshot takers. This increment must happen before the snapshot is 
   // taken. At the same time, Decrement must happen after list copying is over. Only in this way, can it
   // eliminate race condition when Segment.TryRemove() checks whether m_numSnapshotTakers == 0. 
   Interlocked.Increment(ref m_numSnapshotTakers);

   List<T> list = new List<T>();
   try
   {
       //store head and tail positions in buffer, 
       Segment head, tail;
       int headLow, tailHigh;
       GetHeadTailPositions(out head, out tail, out headLow, out tailHigh);

       if (head == tail)
       {
           head.AddToList(list, headLow, tailHigh);
       }
       else
       {
           head.AddToList(list, headLow, SEGMENT_SIZE - 1);
           Segment curr = head.Next;
           while (curr != tail)
           {
               curr.AddToList(list, 0, SEGMENT_SIZE - 1);
               curr = curr.Next;
           }
           //Add tail segment
           tail.AddToList(list, 0, tailHigh);
       }
   }
   finally
   {
       // This Decrement must happen after copying is over. 
       Interlocked.Decrement(ref m_numSnapshotTakers);
   }
   return list;
}

如果您想要的只是快照,那么您很幸运。然而,似乎没有内置的方法来获取和删除所有项目ConcurrentQueue以线程安全的方式。您需要使用以下方法来烘焙自己的同步lock或类似的。或者自己动手(查看源代码可能并不那么困难)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

原子地从 ConcurrentQueue 中获取所有内容 的相关文章

随机推荐