根据评论更新:
我不同意线程池无法处理您遇到的工作负载的说法......让我们看看您的问题并更具体:
1. 您有近 1000 个文件。
2. 每个文件可能需要长达 2 分钟的 CPU 密集型工作来处理。
3. 您希望通过并行处理来提高吞吐量。
4. 您希望在每个文件完成时发出信号并更新 UI。
实际上,您不想运行 1000 个线程,因为您受到拥有的核心数量的限制...并且由于它是 CPU 密集型工作,因此您可能会用很少的线程来最大化 CPU 负载(在我的程序中,它是通常每个核心有 2-4 个线程是最佳的)。
所以你不应该加载 1000 个工作项ThreadPool
并期望看到吞吐量的增加。您必须创建一个始终以最佳数量的线程运行的环境,这需要一些工程设计。
我将不得不稍微反驳我最初的陈述,并实际上推荐生产者/消费者设计。看看这个question https://stackoverflow.com/questions/2294485/c-once-the-main-thread-sleep-all-thread-stopped/2295220#2295220有关图案的更多详细信息。
生产者可能如下所示:
class Producer
{
private final CountDownLatch _latch;
private final BlockingQueue _workQueue;
Producer( CountDownLatch latch, BlockingQueue workQueue)
{
_latch = latch;
_workQueue = workQueue;
}
public void Run()
{
while(hasMoreFiles)
{
// load the file and enqueue it
_workQueue.Enqueue(nextFileJob);
}
_latch.Signal();
}
}
这是你的消费者:
class Consumer
{
private final CountDownLatch _latch;
private final BlockingQueue _workQueue;
Consumer(CountDownLatch latch, BlockingQueue workQueue, ReportStatusToUI reportDelegate)
{
_latch = latch;
_workQueue = workQueue;
}
public void Run()
{
while(!terminationCondition)
{
// blocks until there is something in the queue
WorkItem workItem = _workQueue.Dequeue();
// Work that takes 1-2 minutes
DoWork(workItem);
// a delegate that is executed on the UI (use BeginInvoke on the UI)
reportDelegate(someStatusIndicator);
}
_latch.Signal();
}
}
A CountDownLatch
:
public class CountDownLatch
{
private int m_remain;
private EventWaitHandle m_event;
public CountDownLatch(int count)
{
Reset(count);
}
public void Reset(int count)
{
if (count < 0)
throw new ArgumentOutOfRangeException();
m_remain = count;
m_event = new ManualResetEvent(false);
if (m_remain == 0)
{
m_event.Set();
}
}
public void Signal()
{
// The last thread to signal also sets the event.
if (Interlocked.Decrement(ref m_remain) == 0)
m_event.Set();
}
public void Wait()
{
m_event.WaitOne();
}
}
Jicksa 的阻塞队列 http://jaksa76.blogspot.com/2009/03/blocking-queue-in-c.html:
class BlockingQueue<T> {
private Queue<T> q = new Queue<T>();
public void Enqueue(T element) {
q.Enqueue(element);
lock (q) {
Monitor.Pulse(q);
}
}
public T Dequeue() {
lock(q) {
while (q.Count == 0) {
Monitor.Wait(q);
}
return q.Dequeue();
}
}
}
那么剩下什么呢?现在你所要做的就是启动所有线程......你可以在ThreadPool
, as BackgroundWorker
,或每一个作为new Thread
这没有什么区别.
您只需要创建一个Producer
以及最佳数量Consumers
考虑到您拥有的核心数量(每个核心大约 2-4 个消费者),这是可行的。
父线程(NOT你的 UI 线程)应该阻塞,直到所有消费者线程完成:
void StartThreads()
{
CountDownLatch latch = new CountDownLatch(numConsumer+numProducer);
BlockingQueue<T> workQueue = new BlockingQueue<T>();
Producer producer = new Producer(latch, workQueue);
if(youLikeThreads)
{
Thread p = new Thread(producer.Run);
p.IsBackground = true;
p.Start();
}
else if(youLikeThreadPools)
{
ThreadPool.QueueUserWorkItem(producer.Run);
}
for (int i; i < numConsumers; ++i)
{
Consumer consumer = new Consumer(latch, workQueue, theDelegate);
if(youLikeThreads)
{
Thread c = new Thread(consumer.Run);
c.IsBackground = true;
c.Start();
}
else if(youLikeThreadPools)
{
ThreadPool.QueueUserWorkItem(consumer.Run);
}
}
// wait for all the threads to signal
latch.Wait();
SayHelloToTheUI();
}
请注意,上述代码仅供说明之用。您仍然需要向Consumer
和Producer
并且您需要以线程安全的方式执行此操作。