如何优化间接基数排序? (又名如何优化不可预测的内存访问模式)

2024-03-31

我用 C++ 编写了一个间接基数排序算法(间接,我的意思是它返回项目的索引):

#include <algorithm>
#include <iterator>
#include <vector>

template<class It1, class It2>
void radix_ipass(
    It1 begin, It1 const end,
    It2 const a, size_t const i,
    std::vector<std::vector<size_t> > &buckets)
{
    size_t ncleared = 0;
    for (It1 j = begin; j != end; ++j)
    {
        size_t const k = a[*j][i];
        while (k >= ncleared && ncleared < buckets.size())
        { buckets[ncleared++].clear(); }
        if (k >= buckets.size())
        {
            buckets.resize(k + 1);
            ncleared = buckets.size();
        }
        buckets[k].push_back(size_t());
        using std::swap; swap(buckets[k].back(), *j);
    }
    for (std::vector<std::vector<size_t> >::iterator
        j = buckets.begin(); j != buckets.begin() + ncleared; j->clear(), ++j)
    {
        begin = std::swap_ranges(j->begin(), j->end(), begin);
    }
}

template<class It, class It2>
void radix_isort(It const begin, It const end, It2 const items)
{
    for (ptrdiff_t i = 0; i != end - begin; ++i) { items[i] = i; }
    size_t smax = 0;
    for (It i = begin; i != end; ++i)
    {
        size_t const n = i->size();
        smax = n > smax ? n : smax;
    }
    std::vector<std::vector<size_t> > buckets;
    for (size_t i = 0; i != smax; ++i)
    {
        radix_ipass(
            items, items + (end - begin),
            begin, smax - i - 1, buckets);
    }
}

它的执行速度似乎比std::sort当我使用以下代码对其进行测试时(3920 毫秒与 6530 毫秒相比):

#include <functional>

template<class Key>
struct key_comp : public Key
{
    explicit key_comp(Key const &key = Key()) : Key(key) { }
    template<class T>
    bool operator()(T const &a, T const &b) const
    { return this->Key::operator()(a) < this->Key::operator()(b); }
};

template<class Key>
key_comp<Key> make_key_comp(Key const &key) { return key_comp<Key>(key); }

template<class T1, class T2>
struct add : public std::binary_function<T1, T2, T1>
{ T1 operator()(T1 a, T2 const &b) const { return a += b; } };

template<class F>
struct deref : public F
{
    deref(F const &f) : F(f) { }
    typename std::iterator_traits<
        typename F::result_type
    >::value_type const
        &operator()(typename F::argument_type const &a) const
    { return *this->F::operator()(a); }
};

template<class T> deref<T> make_deref(T const &t) { return deref<T>(t); }

size_t xorshf96(void)  // random number generator
{
    static size_t x = 123456789, y = 362436069, z = 521288629;
    x ^= x << 16;
    x ^= x >> 5;
    x ^= x << 1;
    size_t t = x;
    x = y;
    y = z;
    z = t ^ x ^ y;
    return z;
}

#include <stdio.h>
#include <time.h>

#include <array>

int main(void)
{
    typedef std::vector<std::array<size_t, 3> > Items;
    Items items(1 << 24);
    std::vector<size_t> ranks(items.size() * 2);
    for (size_t i = 0; i != items.size(); i++)
    {
        ranks[i] = i;
        for (size_t j = 0; j != items[i].size(); j++)
        { items[i][j] = xorshf96() & 0xFFF; }
    }
    clock_t const start = clock();
    if (1) { radix_isort(items.begin(), items.end(), ranks.begin()); }
    else  // STL sorting
    {
        std::sort(
            ranks.begin(),
            ranks.begin() + items.size(),
            make_key_comp(make_deref(std::bind1st(
                add<Items::const_iterator, ptrdiff_t>(),
                items.begin()))));
    }
    printf("%u ms\n",
        (unsigned)((clock() - start) * 1000 / CLOCKS_PER_SEC),
        std::min(ranks.begin(), ranks.end()));
    return 0;
}

嗯,我想这就是我能做的最好的了。

但经过多次用头撞墙后,我意识到在开始时预取radix_ipass可以帮助将结果减少到1440 ms (!):

#include <xmmintrin.h>

...

    for (It1 j = begin; j != end; ++j)
    {
#if defined(_MM_TRANSPOSE4_PS)  // should be defined if xmmintrin.h is included
        enum { N = 8 };
        if (end - j > N)
        { _mm_prefetch((char const *)(&a[j[N]][i]), _MM_HINT_T0); }
#endif
        ...
    }

显然,瓶颈是内存带宽——访问模式是不可预测的。

所以现在我的问题是:我还能做些什么来使其在相似数据量上更快?

还是没有太多改进的空间?

(我希望尽可能避免损害代码的可读性,因此如果可读性受到损害,那么改进应该是显着的。)


使用结合排名和值的更紧凑的数据结构可以提高性能std::sort2-3 倍。本质上,排序现在运行在vector<pair<Value,Rank>>. The Value数据类型,std::array<integer_type, 3>已被替换为更紧凑的pair<uint32_t, uint8_t>数据结构。只有半个字节未使用,并且<比较可以分两步完成,首先使用可能有效的比较uint32_ts(不清楚是否使用的循环std::array<..>::operator<可以优化为类似的快速代码,但是替换std::array<integer_type,3>通过这种数据结构产生了另一个性能提升)。

尽管如此,它还是没有基数排序那么有效。 (也许您可以通过预取来调整自定义快速排序?)

除了额外的排序方法之外,我还替换了xorshf96 by a mt19937,因为我知道如何为后者提供种子;)

种子和值的数量可以通过两个命令行参数更改:首先是种子,然后是计数。

用g++ 4.9.0 20131022编译,使用-std=c++11 -march=native -O3, 对于 64 位 Linux

样品运行;重要的提示在 Core2Duo 处理器 U9400 上运行(又老又慢!)



item count: 16000000
using std::sort
duration: 12260 ms
result sorted: true

seed: 5648
item count: 16000000
using std::sort
duration: 12230 ms
result sorted: true

seed: 5648
item count: 16000000
using std::sort
duration: 12230 ms
result sorted: true


seed: 5648
item count: 16000000
using std::sort with a packed data structure
duration: 4290 ms
result sorted: true

seed: 5648
item count: 16000000
using std::sort with a packed data structure
duration: 4270 ms
result sorted: true

seed: 5648
item count: 16000000
using std::sort with a packed data structure
duration: 4280 ms
result sorted: true


item count: 16000000
using radix sort
duration: 3790 ms
result sorted: true

seed: 5648
item count: 16000000
using radix sort
duration: 3820 ms
result sorted: true

seed: 5648
item count: 16000000
using radix sort
duration: 3780 ms
result sorted: true
  

新的或更改的代码:

template<class It>
struct fun_obj
{
        It beg;
        bool operator()(ptrdiff_t lhs, ptrdiff_t rhs)
        {
                return beg[lhs] < beg[rhs];
        }
};

template<class It>
fun_obj<It> make_fun_obj(It beg)
{
        return fun_obj<It>{beg};
}

struct uint32p8_t
{
        uint32_t m32;
        uint8_t m8;

        uint32p8_t(std::array<uint16_t, 3> const& a)
                : m32( a[0]<<(32-3*4) | a[1]<<(32-2*3*4) | (a[2]&0xF00)>>8)
                , m8( a[2]&0xFF )
        {
        }

        operator std::array<size_t, 3>() const
        {
                return {{m32&0xFFF00000 >> (32-3*4), m32&0x000FFF0 >> (32-2*3*4),
                         (m32&0xF)<<8 | m8}};
        }

        friend bool operator<(uint32p8_t const& lhs, uint32p8_t const& rhs)
        {
                if(lhs.m32 < rhs.m32) return true;
                if(lhs.m32 > rhs.m32) return false;
                return lhs.m8 < rhs.m8;
        }
};

#include <stdio.h>
#include <time.h>

#include <array>
#include <iostream>
#include <iomanip>
#include <utility>
#include <algorithm>
#include <cstdlib>
#include <iomanip>
#include <random>

int main(int argc, char* argv[])
{
    std::cout.sync_with_stdio(false);

    constexpr auto items_count_default = 2<<22;
    constexpr auto seed_default = 42;

    uint32_t const seed = argc > 1 ? std::atoll(argv[1]) : seed_default;
    std::cout << "seed: " << seed << "\n";

    size_t const items_count = argc > 2 ? std::atoll(argv[2])
                                        : items_count_default;
    std::cout << "item count: " << items_count << "\n";

    using Items_array_value_t =
        #ifdef RADIX_SORT
            size_t
        #elif defined(STDSORT)
            uint16_t
        #elif defined(STDSORT_PACKED)
            uint16_t
        #endif
    ;

    typedef std::vector<std::array<Items_array_value_t, 3> > Items;
    Items items(items_count);

    auto const ranks_count =
        #ifdef RADIX_SORT
            items.size() * 2
        #elif defined(STDSORT)
            items.size()
        #elif defined(STDSORT_PACKED)
            items.size()
    #endif
    ;

    //auto prng = xorshf96;
    std::mt19937 gen(seed);
    std::uniform_int_distribution<> dist;
    auto prng = [&dist, &gen]{return dist(gen);};

    std::vector<size_t> ranks(ranks_count);
    for (size_t i = 0; i != items.size(); i++)
    {
        ranks[i] = i;

        for (size_t j = 0; j != items[i].size(); j++)
        { items[i][j] = prng() & 0xFFF; }
    }

    std::cout << "using ";
    clock_t const start = clock();
    #ifdef RADIX_SORT
        std::cout << "radix sort\n";
        radix_isort(items.begin(), items.end(), ranks.begin());
    #elif defined(STDSORT)
        std::cout << "std::sort\n";
        std::sort(ranks.begin(), ranks.begin() + items.size(),
                  make_fun_obj(items.cbegin())
                  //make_key_comp(make_deref(std::bind1st(
                  //    add<Items::const_iterator, ptrdiff_t>(),
                  //    items.begin())))
                 );
    #elif defined(STDSORT_PACKED)
        std::cout << "std::sort with a packed data structure\n";
        using Items_ranks = std::vector< std::pair<uint32p8_t,
                                         decltype(ranks)::value_type> >;
        Items_ranks items_ranks;

        size_t i = 0;
        for(auto iI = items.cbegin(); iI != items.cend(); ++iI, ++i)
        {
            items_ranks.emplace_back(*iI, i);
        }

        std::sort(begin(items_ranks), end(items_ranks),
                  [](Items_ranks::value_type const& lhs,
                     Items_ranks::value_type const& rhs)
                  { return lhs.first < rhs.first; }
                 );
        std::transform(items_ranks.cbegin(), items_ranks.cend(), begin(ranks),
                       [](Items_ranks::value_type const& e) { return e.second; }
                      );
    #endif
    auto const duration = (clock() - start) / (CLOCKS_PER_SEC / 1000);

    bool const sorted = std::is_sorted(ranks.begin(), ranks.begin() + items.size(),
                                       make_fun_obj(items.cbegin()));

    std::cout << "duration: " << duration << " ms\n"
              << "result sorted: " << std::boolalpha << sorted << "\n";

    return 0;
}

完整代码:

#include <algorithm>
#include <iterator>
#include <vector>

#include <cstddef>
using std::size_t;
using std::ptrdiff_t;

#include <xmmintrin.h>

template<class It1, class It2>
void radix_ipass(
    It1 begin, It1 const end,
    It2 const a, size_t const i,
    std::vector<std::vector<size_t> > &buckets)
{
    size_t ncleared = 0;
    for (It1 j = begin; j != end; ++j)
    {
        #if defined(_MM_TRANSPOSE4_PS)
            constexpr auto N = 8;
            if(end - j > N)
            { _mm_prefetch((char const *)(&a[j[N]][i]), _MM_HINT_T0); }
        #else
            #error SS intrinsic not found
        #endif

        size_t const k = a[*j][i];
        while (k >= ncleared && ncleared < buckets.size())
        { buckets[ncleared++].clear(); }
        if (k >= buckets.size())
        {
            buckets.resize(k + 1);
            ncleared = buckets.size();
        }
        buckets[k].push_back(size_t());
        using std::swap; swap(buckets[k].back(), *j);
    }
    for (std::vector<std::vector<size_t> >::iterator
        j = buckets.begin(); j != buckets.begin() + ncleared; j->clear(), ++j)
    {
        begin = std::swap_ranges(j->begin(), j->end(), begin);
    }
}

template<class It, class It2>
void radix_isort(It const begin, It const end, It2 const items)
{
    for (ptrdiff_t i = 0; i != end - begin; ++i) { items[i] = i; }
    size_t smax = 0;
    for (It i = begin; i != end; ++i)
    {
        size_t const n = i->size();
        smax = n > smax ? n : smax;
    }
    std::vector<std::vector<size_t> > buckets;
    for (size_t i = 0; i != smax; ++i)
    {
        radix_ipass(
            items, items + (end - begin),
            begin, smax - i - 1, buckets);
    }
}

#include <functional>

template<class Key>
struct key_comp : public Key
{
    explicit key_comp(Key const &key = Key()) : Key(key) { }
    template<class T>
    bool operator()(T const &a, T const &b) const
    { return this->Key::operator()(a) < this->Key::operator()(b); }
};

template<class Key>
key_comp<Key> make_key_comp(Key const &key) { return key_comp<Key>(key); }

template<class T1, class T2>
struct add : public std::binary_function<T1, T2, T1>
{ T1 operator()(T1 a, T2 const &b) const { return a += b; } };

template<class F>
struct deref : public F
{
    deref(F const &f) : F(f) { }
    typename std::iterator_traits<
        typename F::result_type
    >::value_type const
        &operator()(typename F::argument_type const &a) const
    { return *this->F::operator()(a); }
};

template<class T> deref<T> make_deref(T const &t) { return deref<T>(t); }

size_t xorshf96(void)  // random number generator
{
    static size_t x = 123456789, y = 362436069, z = 521288629;
    x ^= x << 16;
    x ^= x >> 5;
    x ^= x << 1;
    size_t t = x;
    x = y;
    y = z;
    z = t ^ x ^ y;
    return z;
}

template<class It>
struct fun_obj
{
        It beg;
        bool operator()(ptrdiff_t lhs, ptrdiff_t rhs)
        {
                return beg[lhs] < beg[rhs];
        }
};

template<class It>
fun_obj<It> make_fun_obj(It beg)
{
        return fun_obj<It>{beg};
}

struct uint32p8_t
{
        uint32_t m32;
        uint8_t m8;

        uint32p8_t(std::array<uint16_t, 3> const& a)
                : m32( a[0]<<(32-3*4) | a[1]<<(32-2*3*4) | (a[2]&0xF00)>>8)
                , m8( a[2]&0xFF )
        {
        }

        operator std::array<size_t, 3>() const
        {
                return {{m32&0xFFF00000 >> (32-3*4), m32&0x000FFF0 >> (32-2*3*4),
                         (m32&0xF)<<8 | m8}};
        }

        friend bool operator<(uint32p8_t const& lhs, uint32p8_t const& rhs)
        {
                if(lhs.m32 < rhs.m32) return true;
                if(lhs.m32 > rhs.m32) return false;
                return lhs.m8 < rhs.m8;
        }
};

#include <stdio.h>
#include <time.h>

#include <array>
#include <iostream>
#include <iomanip>
#include <utility>
#include <algorithm>
#include <cstdlib>
#include <iomanip>
#include <random>

int main(int argc, char* argv[])
{
    std::cout.sync_with_stdio(false);

    constexpr auto items_count_default = 2<<22;
    constexpr auto seed_default = 42;

    uint32_t const seed = argc > 1 ? std::atoll(argv[1]) : seed_default;
    std::cout << "seed: " << seed << "\n";
    size_t const items_count = argc > 2 ? std::atoll(argv[2]) : items_count_default;
    std::cout << "item count: " << items_count << "\n";

    using Items_array_value_t =
        #ifdef RADIX_SORT
            size_t
        #elif defined(STDSORT)
            uint16_t
        #elif defined(STDSORT_PACKED)
            uint16_t
        #endif
    ;

    typedef std::vector<std::array<Items_array_value_t, 3> > Items;
    Items items(items_count);

    auto const ranks_count =
        #ifdef RADIX_SORT
            items.size() * 2
        #elif defined(STDSORT)
            items.size()
        #elif defined(STDSORT_PACKED)
            items.size()
    #endif
    ;

    //auto prng = xorshf96;
    std::mt19937 gen(seed);
    std::uniform_int_distribution<> dist;
    auto prng = [&dist, &gen]{return dist(gen);};

    std::vector<size_t> ranks(ranks_count);
    for (size_t i = 0; i != items.size(); i++)
    {
        ranks[i] = i;

        for (size_t j = 0; j != items[i].size(); j++)
        { items[i][j] = prng() & 0xFFF; }
    }

    std::cout << "using ";
    clock_t const start = clock();
    #ifdef RADIX_SORT
        std::cout << "radix sort\n";
        radix_isort(items.begin(), items.end(), ranks.begin());
    #elif defined(STDSORT)
        std::cout << "std::sort\n";
        std::sort(ranks.begin(), ranks.begin() + items.size(),
                  make_fun_obj(items.cbegin())
                  //make_key_comp(make_deref(std::bind1st(
                  //    add<Items::const_iterator, ptrdiff_t>(),
                  //    items.begin())))
                 );
    #elif defined(STDSORT_PACKED)
        std::cout << "std::sort with a packed data structure\n";
        using Items_ranks = std::vector< std::pair<uint32p8_t,
                                         decltype(ranks)::value_type> >;
        Items_ranks items_ranks;

        size_t i = 0;
        for(auto iI = items.cbegin(); iI != items.cend(); ++iI, ++i)
        {
            items_ranks.emplace_back(*iI, i);
        }

        std::sort(begin(items_ranks), end(items_ranks),
                  [](Items_ranks::value_type const& lhs,
                     Items_ranks::value_type const& rhs)
                  { return lhs.first < rhs.first; }
                 );
        std::transform(items_ranks.cbegin(), items_ranks.cend(), begin(ranks),
                       [](Items_ranks::value_type const& e) { return e.second; }
                      );
    #endif
    auto const duration = (clock() - start) / (CLOCKS_PER_SEC / 1000);

    bool const sorted = std::is_sorted(ranks.begin(), ranks.begin() + items.size(),
                                       make_fun_obj(items.cbegin()));

    std::cout << "duration: " << duration << " ms\n"
              << "result sorted: " << std::boolalpha << sorted << "\n";

    return 0;
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何优化间接基数排序? (又名如何优化不可预测的内存访问模式) 的相关文章

随机推荐

  • 释放分配的内存

    这是好的做法吗 或者我应该替换之间的代码块 and 有一个功能 它可以重复使用 我承认 但我这样做的唯一动机是取消分配colsum因为它是huge并且不需要这样我就可以释放分配的内存 vector
  • 显示 HTML 页面的外部加载资源的文件大小

    文章中 我们如何保持 GitHub 的快速运行 https github com blog 1252 how we keep github fast 他们描述了一个工具栏 GitHub 工作人员可以使用该工具栏查看每个页面加载的各种性能指标
  • div悬停背景颜色改变?

    我怎样才能使它的行为就像一行 div 是锚点一样 所以当我将鼠标悬停在它上面时 它会返回红色 CSS e width 90px border right 1px solid 222 text align center float left
  • Doxygen:在不更改 CSS 的情况下显示数字 Enum 值?

    我需要一种方法让 doxygen 在 doxygen 输出中显示枚举成员的实际值 例如我有 MyEnum typedef enum My Enum MY ENUM 0
  • FQL 图形 API:共同的朋友

    我知道您无法找到朋友的朋友 但是您可以使用某些查询以计算有效的方式找到共同的朋友吗 我什至可以遍历我的朋友来查看他们中哪些是有目标的朋友吗 有什么好的办法可以交到朋友吗 你可以这样做 facebook new Facebook array
  • 我可以像对待数组一样对待结构吗?

    我有一个用于保存 4D 矢量的结构 struct float x float y float z float w vector4f 我正在使用一个库 该库具有一些对向量进行操作但以浮点指针作为参数的函数 调用类似的东西是否合法doSomet
  • 如何使用 Groovy 添加 XML 属性?

    我需要将 属性添加到 Groovy 中 XML 片段的根元素 我想用XmlSlurper 怎么做 添加元素很容易 在 Groovy 控制台中运行此命令以验证其是否有效 import groovy xml StreamingMarkupBui
  • 有没有办法通过 MVC 应用程序中的特定控制器操作来提供 Blazor 应用程序?

    我想在现有的 ASP NET Core 3 0 MVC 项目中设置一个控制器操作来为 Blazor 应用程序提供服务 我过去见过与其他 SPA 框架类似的东西 操作的视图仅包含包含捆绑包的脚本标记 通常还有一些其他设置 例如将 SPA 构建
  • Prometheus Java 摘要指标是线程安全的吗?

    普罗米修斯是Java吗Summary对象线程安全 如果我在类中将其声明为静态 则该类的所有实例都将使用它 Prometheus 是否为该度量对象实现了线程安全 Prometheus 客户端库负责处理方向检测的线程安全等细节 例如Summar
  • is_numeric() 与 is_float() 与 is_int()

    我的理解是 if is numeric input true 那么要么 is float input true OR is int input true OR input 0 OR input是一个数字字符串 意味着如果没有用引号括起来 它
  • FFMPEG 在视频末尾添加图像

    我需要使用 FFMPEG 在 mp4 视频文件末尾添加一秒钟的图像 我的视频尺寸是 WxH 图像尺寸是 MxM 因此视频和图像尺寸不同 我尝试了不同的选项 以便在视频末尾添加图像 ffmpeg i concat videoIn mp4 im
  • iOS 将照片保存在应用程序特定的相册中

    我正在创建一个 iOS 5 应用程序 我想将照片保存到设备中 我想将照片保存到我的应用程序特定的相册中 因此我需要创建相册 然后将照片保存到相册中 我知道如何创建相册 ALAssetsLibrary library ALAssetsLibr
  • R:加速“group by”操作

    我有一个模拟 中间有一个巨大的聚合和组合步骤 我使用 plyr 的 ddply 函数对这个过程进行了原型设计 它可以很好地满足我的大部分需求 但我需要更快的聚合步骤 因为我必须运行 10K 次模拟 我已经在并行扩展模拟 但如果这一步骤更快
  • 设计:新错误(可加密)

    我已经一周没有碰过我的代码了 但是当我捆绑并尝试运行我的网络服务器时 我现在收到以下错误 这让我死在了水里 按照错误输出中的建议包含可设计加密的 gem 并不能解决问题 并且仍然会导致相同的错误 任何帮助是极大的赞赏 DEVISE Devi
  • Reactjs:使用 shouldComponentUpdate() 停止在特定状态更改时重新渲染

    我的组件 onload 中有多个 setState 我的页面在单击下拉值时重新呈现 因为单击时我通过以下方式存储这些值setState 为了停止点击时重新渲染 我使用下面的代码 shouldComponentUpdate return fa
  • Nginx 反向代理配置

    我在使用 nginx 进行简单配置时遇到问题 我有一台托管 docker 容器的服务器 因此 nginx 位于容器中 所以我们调用 urlfoo com 我想要网址foo com service1实际上只是去另一个端口上的 foo com
  • CSS 中如何使用“大于”或“>”字符?

    我在 CSS 文件中多次看到这个字符 但我不知道它是如何使用的 谁能向我解释一下并展示它们如何使页面样式变得更容易 这是一个 CSS 子选择器 P gt SPAN表示将以下样式应用于作为子级的所有 SPAN 标记P tag 请注意 孩子 的
  • Visual Studio Code 中是否有像 DocBlockr 这样的代码注释功能?

    我使用 Sublime Text 3 Atom io 和 Bracket io 作为以前的编辑器 我也非常喜欢新的 Mac 版 Visual Studio 代码编辑器 Sublime 有类似 DocBlockr 的代码注释功能吗 这对我来说
  • Android Studio 0.5.9错误代码42

    每次我尝试从 eclipse 导入项目时 我都会收到这样的错误 Error Execution failed for task app mergeDebugResources 错误 无法运行命令 D Android sdk build to
  • 如何优化间接基数排序? (又名如何优化不可预测的内存访问模式)

    我用 C 编写了一个间接基数排序算法 间接 我的意思是它返回项目的索引 include