执行摘要:如果您想继续使用 C#,Reed 下面的答案是最快的。如果您愿意编组到 C++(我就是),那么这是一个更快的解决方案。
我在 C# 中有两个 55mb ushort 数组。我使用以下循环将它们组合起来:
float b = (float)number / 100.0f;
for (int i = 0; i < length; i++)
{
image.DataArray[i] =
(ushort)(mUIHandler.image1.DataArray[i] +
(ushort)(b * (float)mUIHandler.image2.DataArray[i]));
}
这段代码,根据前后添加 DateTime.Now 调用,运行时间为 3.5 秒。我怎样才能让它更快?
EDIT:我认为这是一些代码,显示了问题的根源。当以下代码在全新的 WPF 应用程序中运行时,我得到以下计时结果:
Time elapsed: 00:00:00.4749156 //arrays added directly
Time elapsed: 00:00:00.5907879 //arrays contained in another class
Time elapsed: 00:00:02.8856150 //arrays accessed via accessor methods
因此,当直接遍历数组时,时间比数组位于另一个对象或容器内部时要快得多。此代码表明,不知何故,我正在使用访问器方法,而不是直接访问数组。即便如此,我似乎能达到的最快速度是半秒。当我使用 icc 运行 C++ 中的第二个代码清单时,我得到:
Run time for pointer walk: 0.0743338
在这种情况下,C++ 的速度快了 7 倍(使用 icc,不确定使用 msvc 是否可以获得相同的性能——我不太熟悉那里的优化)。有没有办法让 C# 接近 C++ 的性能水平,或者我应该让 C# 调用我的 C++ 例程?
清单 1,C# 代码:
public class ArrayHolder
{
int length;
public ushort[] output;
public ushort[] input1;
public ushort[] input2;
public ArrayHolder(int inLength)
{
length = inLength;
output = new ushort[length];
input1 = new ushort[length];
input2 = new ushort[length];
}
public ushort[] getOutput() { return output; }
public ushort[] getInput1() { return input1; }
public ushort[] getInput2() { return input2; }
}
/// <summary>
/// Interaction logic for MainWindow.xaml
/// </summary>
public partial class MainWindow : Window
{
public MainWindow()
{
InitializeComponent();
Random random = new Random();
int length = 55 * 1024 * 1024;
ushort[] output = new ushort[length];
ushort[] input1 = new ushort[length];
ushort[] input2 = new ushort[length];
ArrayHolder theArrayHolder = new ArrayHolder(length);
for (int i = 0; i < length; i++)
{
output[i] = (ushort)random.Next(0, 16384);
input1[i] = (ushort)random.Next(0, 16384);
input2[i] = (ushort)random.Next(0, 16384);
theArrayHolder.getOutput()[i] = output[i];
theArrayHolder.getInput1()[i] = input1[i];
theArrayHolder.getInput2()[i] = input2[i];
}
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
int number = 44;
float b = (float)number / 100.0f;
for (int i = 0; i < length; i++)
{
output[i] =
(ushort)(input1[i] +
(ushort)(b * (float)input2[i]));
}
stopwatch.Stop();
Console.WriteLine("Time elapsed: {0}",
stopwatch.Elapsed);
stopwatch.Reset();
stopwatch.Start();
for (int i = 0; i < length; i++)
{
theArrayHolder.output[i] =
(ushort)(theArrayHolder.input1[i] +
(ushort)(b * (float)theArrayHolder.input2[i]));
}
stopwatch.Stop();
Console.WriteLine("Time elapsed: {0}",
stopwatch.Elapsed);
stopwatch.Reset();
stopwatch.Start();
for (int i = 0; i < length; i++)
{
theArrayHolder.getOutput()[i] =
(ushort)(theArrayHolder.getInput1()[i] +
(ushort)(b * (float)theArrayHolder.getInput2()[i]));
}
stopwatch.Stop();
Console.WriteLine("Time elapsed: {0}",
stopwatch.Elapsed);
}
}
清单 2,C++ 等效项:
//looptiming.cpp :定义控制台应用程序的入口点。
//
#include "stdafx.h"
#include <stdlib.h>
#include <windows.h>
#include <stdio.h>
#include <iostream>
int _tmain(int argc, _TCHAR* argv[])
{
int length = 55*1024*1024;
unsigned short* output = new unsigned short[length];
unsigned short* input1 = new unsigned short[length];
unsigned short* input2 = new unsigned short[length];
unsigned short* outPtr = output;
unsigned short* in1Ptr = input1;
unsigned short* in2Ptr = input2;
int i;
const int max = 16384;
for (i = 0; i < length; ++i, ++outPtr, ++in1Ptr, ++in2Ptr){
*outPtr = rand()%max;
*in1Ptr = rand()%max;
*in2Ptr = rand()%max;
}
LARGE_INTEGER ticksPerSecond;
LARGE_INTEGER tick1, tick2; // A point in time
LARGE_INTEGER time; // For converting tick into real time
QueryPerformanceCounter(&tick1);
outPtr = output;
in1Ptr = input1;
in2Ptr = input2;
int number = 44;
float b = (float)number/100.0f;
for (i = 0; i < length; ++i, ++outPtr, ++in1Ptr, ++in2Ptr){
*outPtr = *in1Ptr + (unsigned short)((float)*in2Ptr * b);
}
QueryPerformanceCounter(&tick2);
QueryPerformanceFrequency(&ticksPerSecond);
time.QuadPart = tick2.QuadPart - tick1.QuadPart;
std::cout << "Run time for pointer walk: " << (double)time.QuadPart/(double)ticksPerSecond.QuadPart << std::endl;
return 0;
}
EDIT 2:在第二个示例中启用 /QxHost 会将时间降至 0.0662714 秒。按照@Reed的建议修改第一个循环让我开始
已用时间:00:00:00.3835017
所以,对于滑块来说仍然不够快。这个时间是通过代码:
stopwatch.Start();
Parallel.ForEach(Partitioner.Create(0, length),
(range) =>
{
for (int i = range.Item1; i < range.Item2; i++)
{
output[i] =
(ushort)(input1[i] +
(ushort)(b * (float)input2[i]));
}
});
stopwatch.Stop();
EDIT 3根据 @Eric Lippert 的建议,我在发布版本中重新运行 C# 代码,并且不使用附加的调试器,而是将结果打印到对话框中。他们是:
- 简单数组:~0.273s
- 包含的数组:~0.330s
- 访问器数组:~0.345s
- 并行阵列:~0.190s
(这些数字来自 5 次运行的平均值)
因此,并行解决方案肯定比我之前得到的 3.5 秒快,但仍然比使用非 icc 处理器可实现的 0.074 秒要快一些。因此,最快的解决方案似乎是在发行版中进行编译,然后编组为 icc 编译的 C++ 可执行文件,这使得此处可以使用滑块。
EDIT 4:@Eric Lippert 的另外三个建议:将 for 循环内部从 length 更改为 array.length,使用双精度数,并尝试不安全的代码。
对于这三个人来说,现在的时机是:
- 长度:~0.274s
- 双精度数,非浮点数:~0.290s
- 不安全:~0.376s
到目前为止,并行解决方案是最大的赢家。虽然如果我可以通过着色器添加这些,也许我可以看到某种加速......
这是附加代码:
stopwatch.Reset();
stopwatch.Start();
double b2 = ((double)number) / 100.0;
for (int i = 0; i < output.Length; ++i)
{
output[i] =
(ushort)(input1[i] +
(ushort)(b2 * (double)input2[i]));
}
stopwatch.Stop();
DoubleArrayLabel.Content += "\t" + stopwatch.Elapsed.Seconds + "." + stopwatch.Elapsed.Milliseconds;
stopwatch.Reset();
stopwatch.Start();
for (int i = 0; i < output.Length; ++i)
{
output[i] =
(ushort)(input1[i] +
(ushort)(b * input2[i]));
}
stopwatch.Stop();
LengthArrayLabel.Content += "\t" + stopwatch.Elapsed.Seconds + "." + stopwatch.Elapsed.Milliseconds;
Console.WriteLine("Time elapsed: {0}",
stopwatch.Elapsed);
stopwatch.Reset();
stopwatch.Start();
unsafe
{
fixed (ushort* outPtr = output, in1Ptr = input1, in2Ptr = input2){
ushort* outP = outPtr;
ushort* in1P = in1Ptr;
ushort* in2P = in2Ptr;
for (int i = 0; i < output.Length; ++i, ++outP, ++in1P, ++in2P)
{
*outP = (ushort)(*in1P + b * (float)*in2P);
}
}
}
stopwatch.Stop();
UnsafeArrayLabel.Content += "\t" + stopwatch.Elapsed.Seconds + "." + stopwatch.Elapsed.Milliseconds;
Console.WriteLine("Time elapsed: {0}",
stopwatch.Elapsed);