创造一个表演者同时实现像埃拉托斯特尼筛法这样的算法比创建高性能的单线程实现要困难一些。原因是您需要找到一种方法来划分工作,以最大限度地减少并行工作线程之间的通信和干扰。
如果实现完全隔离,那么您可以希望速度提高到接近可用逻辑处理器的数量,或者在典型的现代 PC 上大约提高一个数量级。相比之下,使用一个不错的单线程实现的筛子将使您的速度提高至少两到三个数量级。一个简单的逃避方法是在需要时简单地从文件中加载数据,或者使用像 Kim Walisch 的类似的优质筛选程序总理筛 https://github.com/kimwalisch/primesieve.
即使我们只想研究并行化问题,仍然有必要对算法本身及其运行的机器有一些了解。
最重要的方面是,现代计算机具有较深的缓存层次结构,其中只有 L1 缓存(通常为 32 KB)可以全速访问,而所有其他内存访问都会产生严重的损失。转换为埃拉托斯特尼筛法,这意味着您需要一次对目标范围进行 32 KB 窗口的筛选,而不是跨过许多兆字节来跨越每个质数。在平行舞蹈开始之前,必须筛选直到目标范围末端的平方根的小素数,但随后可以独立筛选每个段或窗口。
筛选给定的窗口或段需要确定要筛选的小素数的起始偏移量,这意味着每个窗口和除法对每个小素数至少进行一次模除法是一项极其缓慢的操作。但是,如果您筛选连续的段而不是放置在范围内任何位置的任意窗口,那么您可以保留向量中每个素数的结束偏移量,并将它们用作下一段的起始偏移量,从而消除起始偏移量的昂贵计算。
因此,埃拉托斯特尼筛法的一种有前途的并行化策略是为每个工作线程提供一组连续的 32 KB 块进行筛选,这样每个工作线程只需进行一次起始偏移计算。这样,工作人员之间就不会出现内存访问争用,因为每个工作人员都有自己独立的目标范围子范围。
然而,在开始并行化之前——即让你的代码变得更复杂——你应该首先精简它,并将需要完成的工作减少到绝对必要的程度。例如,看一下代码中的这个片段:
for (int i = prime; i * prime <= size; i++)
list[i * prime] = false;
不要在每次迭代中重新计算循环边界并使用乘法进行索引,而是根据预先计算的循环不变值检查循环变量并将乘法减少为迭代加法:
for (int o = prime * prime; o <= size; o += prime)
list[o] = false;
有两种简单的特定于筛子的优化可以提供显着的快艇。
1) 将偶数从筛子中剔除,并在需要时凭空拉出质数 2。宾果,你的表现刚刚提高了一倍。
2) 不要用小奇素数 3、5、7 等筛选每个段,而是在该段(甚至整个范围)上喷射预先计算的模式。这节省了时间,因为这些小素数在每个部分中进行了很多很多步骤,并且占据了筛分时间的大部分。
还有更多可能的优化,包括一些更容易实现的目标,但要么回报递减,要么努力曲线急剧上升。尝试搜索代码审查 https://codereview.stackexchange.com/为“筛子”。另外,不要忘记,除了算法问题和机器架构之外,您还要与 Java 编译器作斗争,即诸如数组边界检查之类的事情,您的编译器可能无法将其从循环中提升出来。
给您一个大概的数字:具有预先计算模式的单线程分段仅赔率筛选器可以在 C# 中在 2 到 4 秒内筛选整个 32 位范围,具体取决于除了上述内容之外您还应用了多少 TLC。在我老化的笔记本上,您的素数问题要小得多,直到 100000000 (1e8),不到 100 毫秒就得到了解决。
下面的一些代码显示了窗口筛选的工作原理。为了清楚起见,我在读出素数时放弃了所有优化,例如仅赔率表示或轮 3 步进等。它是 C#,但应该与 Java 足够相似以便可读。
注意:我称之为筛数组eliminated
因为 true 值表示一个被划掉的数字(可以避免在开始时用所有 true 填充数组,而且无论如何它都更符合逻辑)。
static List<uint> small_primes_between (uint m, uint n)
{
m = Math.Max(m, 2);
if (m > n)
return new List<uint>();
Trace.Assert(n - m < int.MaxValue);
uint sieve_bits = n - m + 1;
var eliminated = new bool[sieve_bits];
foreach (uint prime in small_primes_up_to((uint)Math.Sqrt(n)))
{
uint start = prime * prime, stride = prime;
if (start >= m)
start -= m;
else
start = (stride - 1) - (m - start - 1) % stride;
for (uint j = start; j < sieve_bits; j += stride)
eliminated[j] = true;
}
return remaining_numbers(eliminated, m);
}
//---------------------------------------------------------------------------------------------
static List<uint> remaining_numbers (bool[] eliminated, uint sieve_base)
{
var result = new List<uint>();
for (uint i = 0, e = (uint)eliminated.Length; i < e; ++i)
if (!eliminated[i])
result.Add(sieve_base + i);
return result;
}
//---------------------------------------------------------------------------------------------
static List<uint> small_primes_up_to (uint n)
{
Trace.Assert(n < int.MaxValue); // size_t is int32_t in .Net (!)
var eliminated = new bool[n + 1]; // +1 because indexed by numbers
eliminated[0] = true;
eliminated[1] = true;
for (uint i = 2, sqrt_n = (uint)Math.Sqrt(n); i <= sqrt_n; ++i)
if (!eliminated[i])
for (uint j = i * i; j <= n; j += i)
eliminated[j] = true;
return remaining_numbers(eliminated, 0);
}