如何从 std::vector 自动删除已完成的 future

2024-04-28

在下面的示例中,mEventExecutors 是一个std::vector<std::future<void>>。我希望能够在完成后从向量中删除未来。这可以做到吗?

void RaiseEvent(EventID messageID)
{
    mEventExecutors.push_back(std::move(std::async([=]{
            auto eventObject = mEventListeners.find(messageID);
            if (eventObject != mEventListeners.end())
            {
                for (auto listener : eventObject->second)
                {
                    listener();
                }
            }
        })
    ));
}

这个问题本身已经被另一个人回答了,但它激起了我的好奇心,想知道如何用最少的代码行实现一个功能齐全、线程安全的任务管理器。

我还想知道是否可以将任务作为未来等待,或者可以选择提供回调函数。

当然,这引出了一个问题:这些 future 是否可以使用性感的延续语法.then(xxx)而不是阻塞代码。

这是我的尝试。

非常感谢克里斯托弗·科尔霍夫 (Christopher Kohlhoff),该书的作者boost::asio。通过研究他出色的工作,我了解到将类别分为以下几类的价值:

  • 句柄 - 控制对象的生命周期
  • 服务 - 提供对象逻辑、在对象实现之间共享的状态,并管理实现对象的生命周期(如果它们比句柄的生命周期长)(任何依赖于回调的事物通常都会这样做),以及
  • 实现提供每个对象的状态。

那么下面是调用代码的示例:

int main() {
    task_manager mgr;

    // an example of using async callbacks to indicate completion and error
    mgr.submit([] {
                   emit("task 1 is doing something");
                   std::this_thread::sleep_for(1s);
                   emit("task 1 done");
               },
               [](auto err) {
                   if (not err) {
                       emit("task 1 completed");
                   } else {
                       emit("task 1 failed");
                   }
               });

    // an example of returning a future (see later)
    auto f = mgr.submit([] {
        emit("task 2 doing something");
        std::this_thread::sleep_for(1500ms);
        emit("task 2 is going to throw");
        throw std::runtime_error("here is an error");
    }, use_future);

    // an example of returning a future and then immediately using its continuation.
    // note that the continuation happens on the task_manager's thread pool
    mgr.submit([]
               {
                   emit("task 3 doing something");
                   std::this_thread::sleep_for(500ms);
                   emit("task 3 is done");
               },
               use_future)
            .then([](auto f) {
                try {
                    f.get();
                }
                catch(std::exception const& e) {
                    emit("task 3 threw an exception: ", e.what());
                }
            });

    // block on the future of the second example
    try {
        f.get();
    }
    catch (std::exception &e) {
        emit("task 2 threw: ", e.what());
    }
}

这将导致以下输出:

task 1 is doing something
task 2 doing something
task 3 doing something
task 3 is done
task 1 done
task 1 completed
task 2 is going to throw
task 2 threw: here is an error

这是完整的代码(在 apple clang 上测试,它比 gcc 更混杂,所以如果我在 lambda 中错过了 this-> ,我很抱歉):

#define BOOST_THREAD_PROVIDES_FUTURE 1
#define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION 1
#define BOOST_THREAD_PROVIDES_EXECUTORS 1

/* written by Richard Hodges 2017
 * You're free to use the code, but please give credit where it's due :)
 */
#include <boost/thread/future.hpp>
#include <boost/thread/executors/basic_thread_pool.hpp>
#include <thread>
#include <utility>
#include <unordered_map>
#include <stdexcept>
#include <condition_variable>

// I made a task an object because I thought I might want to store state in it.
// it turns out that this is not strictly necessary

struct task {

};

/*
 * This is the implementation data for one task_manager
 */
struct task_manager_impl {

    using mutex_type = std::mutex;
    using lock_type = std::unique_lock<mutex_type>;

    auto get_lock() -> lock_type {
        return lock_type(mutex_);
    }

    auto add_task(lock_type const &lock, std::unique_ptr<task> t) {
        auto id = t.get();
        task_map_.emplace(id, std::move(t));
    }

    auto remove_task(lock_type lock, task *task_id) {
        task_map_.erase(task_id);
        if (task_map_.empty()) {
            lock.unlock();
            no_more_tasks_.notify_all();
        }
    }

    auto wait(lock_type lock) {
        no_more_tasks_.wait(lock, [this]() { return task_map_.empty(); });
    }

    // for this example I have chosen to express errors as exceptions
    using error_type = std::exception_ptr;

    mutex_type mutex_;
    std::condition_variable no_more_tasks_;


    std::unordered_map<task *, std::unique_ptr<task>> task_map_;
};

/*
 * This stuff is the protocol to figure out whether to return a future
 * or just invoke a callback.
 * Total respect to Christopher Kohlhoff of asio fame for figuring this out
 * I merely step in his footsteps here, with some simplifications because of c++11
 */
struct use_future_t {
};
constexpr auto use_future = use_future_t();

template<class Handler>
struct make_async_handler {
    auto wrap(Handler handler) {
        return handler;
    }

    struct result_type {
        auto get() -> void {}
    };

    struct result_type result;
};

template<>
struct make_async_handler<const use_future_t &> {
    struct shared_state_type {
        boost::promise<void> promise;
    };

    make_async_handler() {
    }

    template<class Handler>
    auto wrap(Handler &&) {
        return [shared_state = this->shared_state](auto error) {
            // boost promises deal in terms of boost::exception_ptr so we need to marshal.
            // this is a small price to pay for the extra utility of boost::promise over
            // std::promise
            if (error) {
                try {
                    std::rethrow_exception(error);
                }
                catch (...) {
                    shared_state->promise.set_exception(boost::current_exception());
                }
            } else {
                shared_state->promise.set_value();
            }
        };
    }


    struct result_type {
        auto get() -> boost::future<void> { return shared_state->promise.get_future(); }

        std::shared_ptr<shared_state_type> shared_state;
    };

    std::shared_ptr<shared_state_type> shared_state = std::make_shared<shared_state_type>();
    result_type result{shared_state};

};

/*
 * Provides the logic of a task manager. Also notice that it maintains a boost::basic_thread_pool
 * The destructor of a basic_thread_pool will not complete until all tasks are complete. So our
 * program will not crash horribly at exit time.
 */
struct task_manager_service {

    /*
     * through this function, the service has full control over how it is created and destroyed.
     */

    static auto use() -> task_manager_service&
    {
        static task_manager_service me {};
        return me;
    }

    using impl_class = task_manager_impl;

    struct deleter {
        void operator()(impl_class *p) {
            service_->destroy(p);
        }

        task_manager_service *service_;
    };

    /*
     * defining impl_type in terms of a unique_ptr ensures that the handle will be
     * moveable but not copyable.
     * Had we used a shared_ptr, the handle would be copyable with shared semantics.
     * That can be useful too.
     */
    using impl_type = std::unique_ptr<impl_class, deleter>;

    auto construct() -> impl_type {
        return impl_type(new impl_class(),
                         deleter {this});
    }

    auto destroy(impl_class *impl) -> void {
        wait(*impl);
        delete impl;
    }

    template<class Job, class Handler>
    auto submit(impl_class &impl, Job &&job, Handler &&handler) {

        auto make_handler = make_async_handler<Handler>();


        auto async_handler = make_handler.wrap(std::forward<Handler>(handler));

        auto my_task = std::make_unique<task>();
        auto task_ptr = my_task.get();

        auto task_done = [
                this,
                task_id = task_ptr,
                &impl,
                async_handler
        ](auto error) {
            async_handler(error);
            this->remove_task(impl, task_id);
        };
        auto lock = impl.get_lock();
        impl.add_task(lock, std::move(my_task));
        launch(impl, task_ptr, std::forward<Job>(job), task_done);

        return make_handler.result.get();
    };

    template<class F, class Handler>
    auto launch(impl_class &, task *task_ptr, F &&f, Handler &&handler) -> void {
        this->thread_pool_.submit([f, handler] {
            auto error = std::exception_ptr();
            try {
                f();
            }
            catch (...) {
                error = std::current_exception();
            }
            handler(error);
        });
    }


    auto wait(impl_class &impl) -> void {
        impl.wait(impl.get_lock());
    }

    auto remove_task(impl_class &impl, task *task_id) -> void {
        impl.remove_task(impl.get_lock(), task_id);
    }


    boost::basic_thread_pool thread_pool_{std::thread::hardware_concurrency()};

};

/*
 * The task manage handle. Holds the task_manager implementation plus provides access to the
 * owning task_manager_service. In this case, the service is a global static object. In an io loop environment
 * for example, asio, the service would be owned by the io loop.
 */
struct task_manager {

    using service_type = task_manager_service;
    using impl_type = service_type::impl_type;
    using impl_class = decltype(*std::declval<impl_type>());

    task_manager()
            : service_(std::addressof(service_type::use()))
            , impl_(get_service().construct()) {}

    template<class Job, class Handler>
    auto submit(Job &&job, Handler &&handler) {
        return get_service().submit(get_impl(),
                                    std::forward<Job>(job),
                                    std::forward<Handler>(handler));
    }

    auto get_service() -> service_type & {
        return *service_;
    }

    auto get_impl() -> impl_class & {
        return *impl_;
    }

private:

    service_type* service_;
    impl_type impl_;
};


/*
 * helpful thread-safe emitter
 */
std::mutex thing_mutex;

template<class...Things>
void emit(Things &&...things) {
    auto lock = std::unique_lock<std::mutex>(thing_mutex);
    using expand = int[];
    void(expand{0,
                ((std::cout << things), 0)...
    });
    std::cout << std::endl;
}

using namespace std::literals;

int main() {
    task_manager mgr;

    // an example of using async callbacks to indicate completion and error
    mgr.submit([] {
                   emit("task 1 is doing something");
                   std::this_thread::sleep_for(1s);
                   emit("task 1 done");
               },
               [](auto err) {
                   if (not err) {
                       emit("task 1 completed");
                   } else {
                       emit("task 1 failed");
                   }
               });

    // an example of returning a future (see later)
    auto f = mgr.submit([] {
        emit("task 2 doing something");
        std::this_thread::sleep_for(1500ms);
        emit("task 2 is going to throw");
        throw std::runtime_error("here is an error");
    }, use_future);

    // an example of returning a future and then immediately using its continuation.
    // note that the continuation happens on the task_manager's thread pool
    mgr.submit([] {
                   emit("task 3 doing something");
                   std::this_thread::sleep_for(500ms);
                   emit("task 3 is done");
               },
               use_future)
            .then([](auto f) {
                try {
                    f.get();
                }
                catch (std::exception const &e) {
                    emit("task 3 threw an exception: ", e.what());
                }
            });

    // block on the future of the second example
    try {
        f.get();
    }
    catch (std::exception &e) {
        emit("task 2 threw: ", e.what());
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何从 std::vector 自动删除已完成的 future 的相关文章

  • Nullable 是不可能的,为什么不呢? [复制]

    这个问题在这里已经有答案了 如果这是一个愚蠢的问题 请原谅 我正在尝试更好地理解 Net 中的 Nullable 类型 从我从 Microsoft 源代码 使用 ReSharper 中注意到的内容 我了解到 Nullable 是一个结构 而
  • 使用 Xamarin.Forms 和 Zxing 生成 QR 码

    我在网上看到了很多关于这个的内容 旧帖子 但似乎没有什么对我有用 我正在尝试从字符串中生成二维码并将其显示在应用程序中 这就是我一开始的情况 qrCode new ZXingBarcodeImageView BarcodeFormat Ba
  • OpenGL缓冲区更新[重复]

    这个问题在这里已经有答案了 目前我正在编写一个模拟水的程序 以下是我所做的步骤 创建水面 平面 创建VAO 创建顶点缓冲区对象 在其中存储法线和顶点 将指针绑定到此 VBO 创建索引缓冲区对象 然后我使用 glDrawElements 渲染
  • C# Outlook 从收件人获取 CompanyName 属性

    我目前正在使用 C 编写 Outlook 2010 AddIn 我想要的是从我从 AppointmentItem 中提取的 Recipient 对象中获取 CompanyName 属性 因此 有了 AppointmentItem 的收件人
  • C++中的类要具备什么条件才能成为容器?

    我是 C 编程新手 偶然发现了这个术语containers举例如下vector deque map etc 一个企业的最低要求应该是什么class应该满足被称为container in C 我将从 范围 这个概念开始 Range 只有两个方
  • 如何查明 .exe 是否正在 C++ 中运行?

    给定进程名称 例如 程序 exe C 标准库没有这样的支持 您需要一个操作系统 API 来执行此操作 如果这是 Windows 那么您将使用 CreateToolhelp32Snapshot 然后使用 Process32First 和 Pr
  • 以下 PLINQ 代码没有改进

    我没有看到使用以下代码的处理速度有任何改进 IEnumerable
  • 如何调试在发布版本中优化的变量

    我用的是VS2010 我的调试版本工作正常 但我的发布版本不断崩溃 因此 在发布版本模式下 我右键单击该项目 选择 调试 然后选择 启动新实例 此时我看到我声明的一个数组 int ma 4 1 2 8 4 永远不会被初始化 关于可能发生的事
  • PrivateObject 找不到属性

    我的结构基本上如下所示 abstract class A protected string Identificator get set private void DoSomething DoSomethingSpecific protect
  • 关闭整数的最右边设置位

    我只需要关闭最右边的设置位即可 我的方法是找到最右边位的位置 然后离开该位 我编写这段代码是为了这样做 int POS int n int p 0 while n if n 2 0 p else break n n 2 return p i
  • 提升mapped_file_source、对齐方式和页面大小

    我正在尝试在性能很重要的上下文中解析一些大小高达几百兆字节的文本文件 因此我使用 boostmapped file source 解析器期望源以空字节终止 因此我想检查文件大小是否是页面大小的精确倍数 如果是 则使用较慢的非内存映射方法 我
  • 从点云检测平面集

    我有一组点云 我想测试3D房间中是否有角落 所以我想讨论一下我的方法 以及在速度方面是否有更好的方法 因为我想在手机上测试它 我将尝试使用霍夫变换来检测线 然后我将尝试查看是否有三条线相交 并且它们也形成了两个相交的平面 如果点云数据来自深
  • 在 C 语言中替换宏内的宏

    我正在尝试使代码部分可重用 我下面的评论片段没有达到我想要的效果 define NAME ABC define LOG SIZE NAME LEN 我想LOG SIZE决心ABC LEN 我尝试过使用 但没能让它发挥作用 LOG SIZE在
  • WPF DataGrid - 在每行末尾添加按钮

    我想在数据网格的每一行的末尾添加一个按钮 我找到了以下 xaml 但它将按钮添加到开头 有人知道如何在所有数据绑定列之后添加它吗 这会将按钮添加到开头而不是末尾
  • 如何调用与现有方法同名的扩展方法? [复制]

    这个问题在这里已经有答案了 我有这样的代码 public class TestA public string ColA get set public string ColB get set public string ColC get se
  • 值和类型的简洁双向静态 1:1 映射

    我将从我想象如何使用我想要创建的代码开始 它不必完全像这样 但它是我在标题中所说的 简洁 的一个很好的例子 就我而言 它是将类型映射到相关的枚举值 struct bar foo
  • 使用 IdentityDbContext 和 Code First 自动迁移表位置和架构的实体框架?

    我正在尝试使用 IdentityDbContext 类设置自动迁移更新 并将更改传播到整个数据库的实际 DbContext 在进入代码之前 在使用自动迁移实现 IdentityDbContext 时 我收到此错误 影响迁移历史系统表位置的自
  • MSVC编译器下使用最大成员初始化联合

    我正在尝试初始化一个LARGE INTEGER在 C 库中为 0 确切地说是 C 03 以前 初始化是 static LARGE INTEGER freq 0 在 MinGW 下它产生了一个警告 缺少成员 LARGE INTEGER Hig
  • 如何知道 HTTP 请求标头值是否存在

    我确信这很简单 但是却让我感到厌烦 我在 Web 应用程序中使用了一个组件 它在 Web 请求期间通过添加标头 XYZComponent true 来标识自身 我遇到的问题是 如何在视图中检查此组件 以下内容不起作用 if Request
  • Emacs C++,打开相应的头文件

    我是 emacs 新手 我想知道 是否有在头文件 源文件和相应的源文件 头文件之间切换的快捷方式 是否有像通用 emacs 参考卡那样的参考卡 Thanks There s ff find other file 您可以使用以下方法将其绑定到

随机推荐