我有一个使用 PLINQ 并行化的计算,如下所示:
这就是有趣的地方:由于文件的限制
提供的读卡器类型source
, 我需要source
成为
在初始线程上枚举,而不是在并行工作线程上枚举。我需要
的全面评价source
to be bound到主要
线。然而,似乎来源实际上是在工人身上枚举的
线程。
我的问题是:有没有一种直接的方法来修改这段代码
绑定枚举source
到初始线程,同时
将繁重的工作分担给并行工人?请记住
只是做一个热切的事.ToList()
之前AsParallel()
在这里不是一个选项,
因为来自文件的数据流很大。
下面是一些示例代码,演示了我所看到的问题:
using System.Threading;
using System.Collections.Generic;
using System.Linq;
using System;
public class PlinqTest
{
private static string FormatItems<T>(IEnumerable<T> source)
{
return String.Format("[{0}]", String.Join(";", source));
}
public static void Main()
{
var expectedThreadIds = new[] { Thread.CurrentThread.ManagedThreadId };
var threadIds = Enumerable.Range(1, 1000)
.Select(x => Thread.CurrentThread.ManagedThreadId) // (1)
.AsParallel()
.WithDegreeOfParallelism(8)
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.AsOrdered()
.Select(x => x) // (2)
.ToArray();
// In the computation above, the lambda in (1) is a
// stand in for the file-reading operation that we
// want to be bound to the main thread, while the
// lambda in (2) is a stand-in for the "expensive
// computation" that we want to be farmed out to the
// parallel worker threads. In fact, (1) is being
// executed on all threads, as can be seen from the
// output.
Console.WriteLine("Expected thread IDs: {0}",
FormatItems(expectedThreadIds));
Console.WriteLine("Found thread IDs: {0}",
FormatItems(threadIds.Distinct()));
}
}
我得到的示例输出是:
Expected thread IDs: [1]
Found thread IDs: [7;4;8;6;11;5;10;9]
如果您放弃 PLINQ 并仅显式使用任务并行库,那么这相当简单(尽管可能不那么简洁):
// Limits the parallelism of the "expensive task"
var semaphore = new SemaphoreSlim(8);
var tasks = Enumerable.Range(1, 1000)
.Select(x => Thread.CurrentThread.ManagedThreadId)
.Select(async x =>
{
await semaphore.WaitAsync();
var result = await Task.Run(() => Tuple.Create(x, Thread.CurrentThread.ManagedThreadId));
semaphore.Release();
return result;
});
return Task.WhenAll(tasks).Result;
请注意,我正在使用Tuple.Create
记录来自主线程的线程 ID 和来自派生任务的线程 ID。根据我的测试,前者对于每个元组总是相同的,而后者则不同,这是应该的。
信号量确保并行度永远不会超过 8(尽管创建元组的成本较低,但这种情况不太可能发生)。如果达到 8,任何新任务都将等待,直到信号量上有可用的位置。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)