使用 SIMD 优化一维热方程

2024-02-17

我正在使用 CFD 代码(用于计算流体动力学)。我最近有机会看到英特尔编译器在我的一个循环中使用 SSE,使该循环中的计算性能提高了近 2 倍。不过,SSE和SIMD指令的使用似乎更像是运气。大多数时候,编译器什么也不做。

然后我尝试强制使用 SSE,考虑到 AVX 指令将在不久的将来加强这方面。

我制作了一个简单的一维传热代码。它由两个阶段组成,使用另一个阶段的结果(U0 -> U1,然后 U1 -> U0,然后 U0 -> U1,等等)。当迭代时,它收敛到稳定解。我的主代码中的大多数循环都使用相同类型的计算。 (有限差分)。

但是,我的代码比正常循环慢两倍。结果相同,因此计算是一致的。

我做错了吗?在超级计算机(使用 Westmer)上测试之前,我使用 Core 2 来测试循环。

这是代码,先是 SSE 循环,然后是参考循环:

#include <stdio.h>
#include <emmintrin.h>
#include <time.h>
//#include <vector>
#define n1 1004
#define niter 200000

int i,j,t;

double U0[n1] __attribute__ ((aligned(16)));
double U1[n1] __attribute__ ((aligned(16)));
double Dx,Dy,Lx,Ly,InvDxDx,Dt,alpha,totaltime,Stab,DtAlpha,DxDx;

__m128d vmmx00;
__m128d vmmx01;
__m128d vmmx02;
__m128d vmmx10;

__m128d va;
__m128d vb;
__m128d vc;
__m128d vd;

clock_t time0,time1;
FILE *f1;
int main()
{
/* ---- GENERAL ---- */
   alpha = 0.4;
   totaltime = 1.0/100.0;
   Dt = totaltime/((niter-1)*1.0);
   Lx = 1.0;
   Dx = Lx/((n1-1)*1.0);
   InvDxDx = 1.0/(Dx*Dx);
   DxDx = Dx*Dx;
   Stab = alpha*Dt*(InvDxDx);
   DtAlpha = Dt*alpha;
/* Stability if result <= 0.5 */
   printf("Stability factor : %f \n",Stab);


   for( i = 0; i < n1; i++){U0[i] = 0.0;}
   U0[1] = 1.0;
   U0[2] = 1.0;
   U0[3] = 1.0;
   U0[n1-2] = 2.0;

//    for ( i = 0; i < n1; i++) {
 //       for ( j = i + 1; j < n2; j++) {
  //          std::swap(U0[i][j], U0[j][i]);
   //     }
    //}

    va = _mm_set1_pd(-2.0);
    vb = _mm_set1_pd(InvDxDx);
    vd = _mm_set1_pd(DtAlpha);

 time0=clock();

 for( t = 0; t < niter; t++)
 {

    for( i = 2; i < n1-2; i+=2)
    {
      //printf("%d  %d  \n",i,j);
      //fflush(stdout);
      vmmx00 = _mm_load_pd(&U0[i]);
      vmmx01 = _mm_loadu_pd(&U0[i+1]);
      vmmx02 = _mm_loadu_pd(&U0[i-1]);

      vmmx10 = _mm_mul_pd(va,vmmx00);   // U1[i][j] = -2.0*U0[i][j];
      vmmx10 = _mm_add_pd(vmmx10,vmmx01);   // U1[i][j] = U1[i][j] + U0[i+1][j];
      vmmx10 = _mm_add_pd(vmmx10,vmmx02);   // U1[i][j] = U1[i][j] + U0[i-1][j];
      vmmx10 = _mm_mul_pd(vb,vmmx10);   // U1[i][j] = U1[i][j] * InvDxDx;

      vmmx10 = _mm_mul_pd(vd,vmmx10);   // U1[i][j] = U1[i][j] * DtAlpha;
      vmmx10 = _mm_add_pd(vmmx10,vmmx00);   // U1[i][j] = U1[i][j] + U0[i][j];

      _mm_store_pd(&U1[i],vmmx10);

      // U1[i][j] = U0[i][j] + DtAlpha*( (U0[i+1][j]-2.0*U0[i][j]+U0[i-1][j])*InvDxDx
    }

    for( i = 2; i < n1-2; i+=2)
    {
      //printf("%d  %d  \n",i,j);
      //fflush(stdout);
      vmmx00 = _mm_load_pd(&U1[i]);
      vmmx01 = _mm_loadu_pd(&U1[i+1]);
      vmmx02 = _mm_loadu_pd(&U1[i-1]);

      vmmx10 = _mm_mul_pd(va,vmmx00);   // U0[i][j] = -2.0*U1[i][j];
      vmmx10 = _mm_add_pd(vmmx10,vmmx01);   // U0[i][j] = U0[i][j] + U1[i+1][j];
      vmmx10 = _mm_add_pd(vmmx10,vmmx02);   // U0[i][j] = U0[i][j] + U1[i-1][j];
      vmmx10 = _mm_mul_pd(vb,vmmx10);   // U0[i][j] = U0[i][j] * InvDxDx;

      vmmx10 = _mm_mul_pd(vd,vmmx10);   // U0[i][j] = U0[i][j] * DtAlpha;
      vmmx10 = _mm_add_pd(vmmx10,vmmx00);   // U0[i][j] = U0[i][j] + U1[i][j];

      _mm_store_pd(&U0[i],vmmx10);

      // U1[i][j] = U0[i][j] + DtAlpha*( (U0[i+1][j]-2.0*U0[i][j]+U0[i-1][j])*InvDxDx
    }

 }

 time1=clock();
 printf("Loop 0, total time : %f \n", (double) time1-time0);
 f1 = fopen ("out0.dat", "wt");
 for( i = 1; i < n1-1; i++)
 {
       fprintf (f1, "%d\t%f\n", i, U0[i]);
 }


// REF

   for( i = 0; i < n1; i++){U0[i] = 0.0;}
   U0[1] = 1.0;
   U0[2] = 1.0;
   U0[3] = 1.0;
   U0[n1-2] = 2.0;

 time0=clock();

 for( t = 0; t < niter; t++)
 {

    for( i = 2; i < n1-2; i++)
    {
       U1[i] = U0[i] + DtAlpha* (U0[i+1]-2.0*U0[i]+U0[i-1])*InvDxDx;
    }

    for( i = 2; i < n1-2; i++)
    {
       U0[i] = U1[i] + DtAlpha* (U1[i+1]-2.0*U1[i]+U1[i-1])*InvDxDx;
    }

 }

 time1=clock();
 printf("Loop 0, total time : %f \n", (double) time1-time0);
 f1 = fopen ("outref.dat", "wt");
 for( i = 1; i < n1-1; i++)
 {
       fprintf (f1, "%d\t%f\n", i, U0[i]);
 }



}

++++++++++++++++++++++++++++++++++++++++++++++++++++++ ++++++++++

Edit :

++++++++++++++++++++++++++++++++++++++++++++++++++++++ ++++++++++

考虑到您的回答,我找到了合适的地方来讨论这个问题,所以我将扩大主题并解释我的目的。如果您接受,我们将逐一讨论所有循环。这可能很长,但对于我所在领域的很多人以及 OpenFoam 等开源求解器来说可能非常有用。不考虑对能源消耗的影响(我们都使用大型超级计算器)。

我使用的 CFD 代码在 512 个 Westmer 核心上运行需要 1 个多月。我正在使用 MPI(消息传递接口)在进程之间进行通信。 物理场可以被视为网格,即 1D、2D 或 3D 阵列,具体取决于模拟类型。但 3D 是您能想象到的最好的。

完整的代码是 Fortran 95,它实际上是 C 的简化版。它很容易与 C 接口,并且可以直接从 Fortran 调用 C 例程,类型相同(int、double、long 等)。 但是,Fortran 不允许这样的优化:它被设计得很简单。这就是我研究 C 指令的原因。

在所有 CFD 代码中,我们都面临着相同的问题:3 种类型的循环和 MPI 内存分配。让我们首先讨论循环:

  • 空间导数(称为有限差分): 该循环由一维卷积组成对于所有情况(1D、2D、3D,一次只能在一把斧头上求导)(DF = F[i-1]*A + F[i]*B + F[i+1]*C)。然而,当使用超过 1D 时,内存访问变成如下:

    // x1 derivative
    for i 1 -> n1
        for j 1 ->n2
            DF_x1[i][j] = F[i-1][j]*A + F[i][j]*B + F[i+1][j]*C
    
    // x2 derivative
    for i 1 -> n1
        for j 1 ->n2
            DF_x2[i][j] = F[i][j-1]*D + F[i][j]*E + F[i][j+1]*G
    

    在第一个循环中,内存访问没有继续(Fortran 中的 invert,内存被反转)。这是第一个问题。使用 3D 数组时同上。

  • 泊松方程解析,即矩阵乘法: 该循环由 1D、2D 或 3D 卷积组成,取决于模拟。这实际上是二阶导数 (DDF = D(DF))。

    for i 1 -> n1
        for j 1 ->n2
            DDF[i][j] = F[i-1][j]*A + F[i][j]*B + F[i+1][j]*C + F[i][j-1]*D + F[i][j]*E + F[i][j+1]*G
    

    这个循环与我第一次给你的循环相同,但它是直接计算的,而不是偶数和奇数。

  • 加权高斯赛德尔分辨率,即与下面相同的循环,但具有依赖性:

    // even
    for i 1 -> n1
        for j 1 ->n2
            F1[i][j] = F0[i-1][j]*A + F0[i][j]*B + F0[i+1][j]*C + F0[i][j-1]*D + F0[i][j]*E + F0[i][j+1]*G
    
    //odd
    for i 1 -> n1
        for j 1 ->n2
            F0[i][j] = F1[i-1][j]*A + F1[i][j]*B + F1[i+1][j]*C + F1[i][j-1]*D + F1[i][j]*E + F1[i][j+1]*G
    

    这是您之前研究过的循环。

然后,我们面临另一个问题:内存分配。每个核心都有自己的内存,并且需要与其他核心共享。 让我们考虑最后一个循环,但经过简化:

    for t 1 -> niter

        // even
        for i 1 -> n1-2
                F1[i] = F0[i-1]*A + F0[i]*B + F0[i+1]*C

        //odd
        for i 1 -> n1-2
                F0[i] = F1[i-1]*A + F1[i]*B + F1[i+1]*C

假设n1=512,但由于RAM容量较小,因此无法存储在本地内存中。内存分布在 core0 (1->255) 和 core1 (256-512) 上,它们不在同一台计算机上,而是在网络上。 在这种情况下,i=256 中的导数需要知道点 i=255,但该值位于另一个过程上。包含其他处理器的值的内存称为 GHOST 内存。所以循环是:

    ! update boundary memory :
    Share to ghost : core0 : F0[255] -> Network -> F0[0] : core1 (don't forget that for core1, the array restart from 0)
    Share to ghost : core1 : F0[1] -> Network -> F0[256] : core0 (you understand that F0[256] is the ghost for core0, and F0[0] is the ghost for core1)
    // even, each core do this loop.
    for i 1 -> n1-2
            F1[i] = F0[i-1]*A + F0[i]*B + F0[i+1]*C

    ! update boundary memory :
    Share to ghost : core0 : F1[255] -> Network -> F1[0] : core1
    Share to ghost : core1 : F1[1] -> Network -> F1[256] : core0

    //odd, each core do this loop.
    for i 1 -> n1-2
            F0[i] = F1[i-1]*A + F1[i]*B + F1[i+1]*C

我们需要处理这个问题。神秘的,你现在明白我要说什么了:循环交错需要考虑到这一点。 我认为可以这样做:

        ! update boundary memory :
        Share to ghost : core0 : F0[255] -> Network -> F0[0] : core1 
        Share to ghost : core1 : F0[1] -> Network -> F0[256] : core0

        for t 1 -> niter

        ! compute borders in advance :
        core0 only : F1[255] = F0[254]*A + F0[255]*B + F0[256]*C
        core1 only : F1[1] = F0[0]*A + F0[1]*B + F0[2]*C

        Launch Share to ghost asynchronous : core0 : F1[255] -> Network -> F1[0] : core1 
        Launch Share to ghost asynchronous : core1 : F1[1] -> Network -> F1[256] : core0

        During the same time (this can be done at the same time because MPI support asynchronous communications)
        // even
        for i 2 -> n1-3 (note the reduced domain)
                F1[i] = F0[i-1]*A + F0[i]*B + F0[i+1]*C

        Check that communications are done.


        ! compute borders in advance :
        core0 only : F0[255] = F1[254]*A + F1[255]*B + F1[256]*C
        core1 only : F0[1] = F1[0]*A + F1[1]*B + F1[2]*C

        Launch Share to ghost asynchronous : core0 : F0[255] -> Network -> F0[0] : core1 
        Launch Share to ghost asynchronous : core1 : F0[1] -> Network -> F0[256] : core0

        //odd, each core do this loop.
        for i 2 -> n1-3
                F0[i] = F1[i-1]*A + F1[i]*B + F1[i+1]*C

        Check that communications are done.

希望我没有在某个地方的索引上犯错误。 我们暂时考虑第一种循环,它是最简单的,之后我们可以进行循环 2 和循环 3,它们是类似的。目的是做到这一点(类似于图像处理):

    // x1 derivative
    for i 1 -> n1
        for j 1 ->n2
            DF_x1[i][j] = F[i-1][j]*A + F[i][j]*B + F[i+1][j]*C

    // x2 derivative
    for i 1 -> n1
        for j 1 ->n2
            DF_x2[i][j] = F[i][j-1]*D + F[i][j]*E + F[i][j+1]*G

我正在研究它,我将在几个小时内发布结果代码,并考虑您的建议。


Mysticial 在代码中的智慧之言:

  xmm7 = InvDxDx * dtAlpha; // precalculate
  xmm6 = -2*xmm7;
  xmm0 = *ptr++;   // has values d0, d1
  xmm1 = *ptr++;   // has values d2, d3
  while (loop--) {
     xmm2  = xmm0;  // take a copy of d0,d1
     xmm0 += xmm1;  // d0+d2, d1+d3
     xmm2 = shufps (xmm2,xmm1, 0x47);  // the middle elements d1,d2 ??
     xmm0 *= xmm7;  // sum of outer elements * factor
     xmm2 *= xmm6;  // -2 * center element * factor
     // here's still a nasty dependency
     xmm2 += xmm0;
     xmm0 = xmm1;    // shift registers
     *out_ptr ++= xmm2;  // flush
     xmm1 = *ptr++;  // read in new values

  }

这可以通过将循环分为 2 段来改进,每个段工作 502 个条目,或者与不同的任务分开,并交错结果,从而打破依赖链。

此外,可以通过将循环展开两次并在其他情况下更改 xmm0 和 xmm1 的含义来避免移位寄存器方法 xmm0

这指出了另一个大问题:当使用内在函数时,寄存器总是会溢出到内存中。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 SIMD 优化一维热方程 的相关文章

随机推荐

  • 为什么不使用 django-admin startapp mysite 生成 urls.py?

    但必须由用户创建 project settings py mysite views py apps py models py user created urls py file 应用程序不需要有 url 视图或任何东西 它也可以只是模板的集
  • 何时删除 Git 中的分支?

    假设我们有一个稳定的应用程序 明天 有人报告了一个大错误 我们决定立即修复 因此 我们为 master 的修补程序创建了一个分支 将其命名为 2011 Hotfix 并将其向上推送 以便所有开发人员都可以协作修复它 我们修复了该错误 并将
  • UpSetR 按颜色集分组

    我盯着这个问题看了几个小时 似乎没有找到解决方案 我希望 upSet 图按集合着色 例如 library UpSetR movies lt read csv system file extdata movies csv package Up
  • 谱系图数据库[关闭]

    Closed 此问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 有人可以向我指出谱系图数据库的有效使用吗 我想学习 neo4j 并且使用 python 所以我想为自己制
  • Flutter - InkWell 对 Flex 内的 onTap 没有反应

    我想弄清楚 为什么onTap 我的 InkWell 内部的方法不起作用 InkWell 小部件位于Flexible小部件 这Flexible小部件也在里面 ARow 这是我的代码 TextEditingController controll
  • 对于小 x、大 y 值,有效的 HashCode() 是什么?

    我使用 HashMap 将 x y 值映射到笛卡尔平面上 对于非常小的 x 和非常大的 y 值 有效的 HashCode 是什么 目前我正在使用 public int hashCode return y 31 x Typical x y v
  • 计算 .Net BitArray 类中设置的位

    我正在实现一个库 其中广泛使用 Net BitArray 类 并且需要与 Java BitSet Cardinality 方法等效的方法 即返回设置的位数的方法 我正在考虑将其实现为 BitArray 类的扩展方法 简单的实现是迭代和计算位
  • Matlab 匿名函数 If else

    在 MATLAB 中 我尝试对元胞数组执行一个函数 但运气不佳 我想创建一个cellfun它检查是否str2double回报NaN值 然后执行str2double关于不存在的值NaNs 我试图使用一个匿名函数 其中包含 IF Else 类型
  • 在短语中搜索单词并返回匹配项

    我需要建立一个公式 如果在X列中 有B列的值 无论位置 则返回A的值 Column A Amoxicilina Azitromicina Cetoconazol Column B Amoxicilina Esomeprazol Clarit
  • 计算二维数组(矩阵)每列的总计

    给定下面的数组 如何创建具有匹配键的求和数组 arr alpha 1 2 3 4 5 beta 1 2 3 4 5 gamma 1 2 3 4 5 delta 1 2 3 4 5 这就是我最终想要的 4 8 12 16 20 这是最有效的方
  • CSS 类选择器

    我想选择一个divclass c id i c color red border 1px font size 25px background color yellow width 200px div class c change div 现
  • 如何为每个成员选择最新的输入?

    假设我有一个 MySQL 表 time mid field 1 field 2 100 1 32 54 100 2 0 34 100 3 44 99 200 1 0 45 200 2 0 45 200 3 4 59 200 4 45
  • Intellij idea 堆大小无法更改

    有一天 我运行一些巨大的东西 弹出一个窗口说堆内存内存不足 我在该窗口中将其设置为 2014M 然后单击继续 一切都很好 但我不喜欢数字 2014 我想要 2048 所以 我更改了 Xmx 选项idea64 exe vmoptions 如下
  • Facebook Graph API - 通过一次查询访问评论及其回复

    以下 Facebook Graph API v2 6 查询 POST ID fields comments summary true access token ACCESS TOKEN 将获取指定帖子的评论 及其 ID 然后查询 COMME
  • 使用 C++ 解析 ONNX 模型。使用 C++ 从 onnx 模型中提取层、输入和输出形状

    我正在尝试从 onnx 模型中提取输入层 输出层及其形状等数据 我知道有 python 接口可以做到这一点 我想做类似的事情code https stackoverflow com questions 56734576 find input
  • 在Lua中,#INF和#IND是什么?

    我对 Lua 还很陌生 在测试时我发现 INF IND 但是 我找不到解释它的好参考资料 什么是 INF IND 以及类似的 例如底片 以及如何生成和使用它们 INF是无限的 IND是 NaN 测试一下 print 1 0 print 0
  • xcode4:可靠地检测项目/工作空间的 DerivedData 目录

    Xcode 4 将所有内容构建到 HOME Library Developer Xcode DerivedData PROJECT UUID where UUID是一个看似随机的字符串 它不是真正随机的 只是看起来随机 我怎样才能可靠地检测
  • 如何制作一个以程序为目标并在文本框中键入内容的 .bat

    我不知道从哪里开始 我见过这样的答案 但我不知道如何根据我想要的格式设置它们 我只需要在 Minecraft 服务器打开时将其定位 并通过在控制台中键入 stop 来关闭它 我没有可显示的代码 但这将位于其他文件中 以便我可以启动它 然后让
  • 核心数据:NSFetchRequest 按对多关系计数排序

    假设我有一个父实体 每个实体都有多个子实体 我想让所有父母按孩子的数量排序 类似于以下伪代码 NSEntityDescription entity NSEntityDescription entityForName Parent inMan
  • 使用 SIMD 优化一维热方程

    我正在使用 CFD 代码 用于计算流体动力学 我最近有机会看到英特尔编译器在我的一个循环中使用 SSE 使该循环中的计算性能提高了近 2 倍 不过 SSE和SIMD指令的使用似乎更像是运气 大多数时候 编译器什么也不做 然后我尝试强制使用