线程是执行序列。它们的行为大致类似于线性 C++ 程序,嵌入在内存模型中,使它们能够进行通信并注意到由其他执行线程引起的状态变化。
如果没有线程的配合,对线程的回调无法接管执行序列。您想要通知的线程必须显式检查消息是否已到达并处理它。
有两种常见的方法来处理消息响应。
第一个是std::future
喜欢的方法。在其中,调用者获得某种令牌,该令牌代表将来可能或将会产生的答案。
第二种是再次使用消息传递。您向 B 发送一条消息,请求回复。 B 将包含响应的消息发送回 A。与 B 接收消息的方式相同,A 也接收回消息。该消息可能包含某种“返回目标”,以帮助 A 将其链接到原始消息。
在基于消息的系统中,通常有一个“事件循环”。您拥有一个反复返回“事件循环”的线程,而不是大型线性程序。它在那里检查队列中是否有消息,如果没有则等待一些消息。
在这样的系统下,任务必须被分解为小块,以便您经常检查事件循环以做出响应。
实现此目的的一种方法是使用协程,这是一种不拥有自己的执行器的执行状态(就像线程一样,它同时拥有两者)。协程定期放弃优先级并“保存其状态供以后使用”。
未来的解决方案通常是最简单的,但它依赖于 A 定期检查响应。
首先,一个threaded_queue<T>
,它允许任意数量的生产者和消费者将东西传递到队列中并从前面吃掉它们:
template<class T>
struct threaded_queue {
using lock = std::unique_lock<std::mutex>;
void push_back( T t ) {
{
lock l(m);
data.push_back(std::move(t));
}
cv.notify_one();
}
boost::optional<T> pop_front() {
lock l(m);
cv.wait(l, [this]{ return abort || !data.empty(); } );
if (abort) return {};
auto r = std::move(data.back());
data.pop_back();
return std::move(r);
}
void terminate() {
{
lock l(m);
abort = true;
data.clear();
}
cv.notify_all();
}
~threaded_queue()
{
terminate();
}
private:
std::mutex m;
std::deque<T> data;
std::condition_variable cv;
bool abort = false;
};
现在,我们希望将任务传递到这样的队列中,并让传递任务的人返回结果。以下是上述内容在打包任务中的用法:
template<class...Args>
struct threaded_task_queue {
threaded_task_queue() = default;
threaded_task_queue( threaded_task_queue&& ) = delete;
threaded_task_queue& operator=( threaded_task_queue&& ) = delete;
~threaded_task_queue() = default;
template<class F, class R=std::result_of_t<F&(Args...)>>
std::future<R> queue_task( F task ) {
std::packaged_task<R(Args...)> p(std::move(task));
auto r = p.get_future();
tasks.push_back( std::move(p) );
return r;
}
void terminate() {
tasks.terminate();
}
std::function<void(Args...)> pop_task() {
auto task = tasks.pop_front();
if (!task) return {};
auto task_ptr = std::make_shared<std::packaged_task<R(Args...)>>(std::move(*task));
return [task_ptr](Args...args){
(*task_ptr)(std::forward<Args>(args)...);
};
}
private:
threaded_queue<std::packaged_task<void(Args...)>> tasks;
};
如果我做对了,它的工作原理如下:
A 以 lambda 的形式向 B 发送队列任务。这个 lambda 接受一些固定的参数集(由 B 提供),并返回一些值。
B 出队列,得到 astd::function
这需要参数。它调用它;它返回void
在 B 的上下文中。
A 被给予future<R>
当它对任务进行排队时。它可以查询它以查看它是否完成。
您会注意到,无法“通知”A 事情已完成。这需要不同的解决方案。但如果 A 最终达到了不等待 B 结果就无法前进的地步,那么这个系统就可以工作。
另一方面,如果 A 积累了大量此类消息,有时需要等待许多此类 B 的输入,直到其中任何一个返回数据(或用户执行某些操作),那么您需要比 A 更高级的东西std::future<R>
。一般模式——拥有代表未来要交付的计算的令牌——是可靠的。但是您需要对其进行增强,以便与未来计算和消息循环等的多个源良好地配合。
代码未测试。
一种方法threaded_task_queue
当您发送消息时:
template<class Signature>
struct message_queue;
template<class R, class...Args>
struct message_queue<R(Args...) :
threaded_task_queue< std::function<R(Args...)> >
{
std::future<R> queue_message(Args...args) {
return this->queue_task(
[tup = std::make_tuple(std::forward<Args>(args)...)]
( std::function<R(Args...)> f ) mutable
{
return std::apply( f, std::move(tup) );
}
);
}
bool consume_message( std::function<R(Args...)> f )
{
auto task = pop_task();
if (!task) return false;
task( std::move(f) );
return true;
}
};
在提供商方面,您提供Args...
,而在消费者方面你消费Args...
并返回R
,在提供者方面你有一个future<R>
消费者完成后即可得到结果。
这可能比原始的更自然threaded_task_queue
我写。
std::apply
是 C++17,但 C++11 和 C++14 也有广泛的实现。