I. Motivation
在分布式计算模型中,常常会遇到线程间通信(同/异步)的问题,比如 Master 分配任务给 Worker ,后者在完成任务之后,需要向 Master 打报告,请求其处理返回结果,其中 Master 的业务逻辑是一个死循环,它一直在分配任务
这是个很经典的通信过程,如果用 C++ 自带的那套机制去解决这个问题,那么就只能采用共享内存法才能达到向 Master 打报告的目的,因为只有让 Master 跳出死循环,它才能处理其他对象发给它的信号(可以理解为 Master 线程腾不出手)。具体表现为 Worker 在向 Master 打报告时,修改 Master 的循环条件变量,使其跳出分配任务的死循环,转而处理返回结果,待处理完成后再重新进入业务循环中。这样做是可以的,但有失 OOP 的原则( Worker 的手竟然能伸进 Master 的口袋,去修改 Master 的成员变量!这是绝对不允许的)
在 Qt 的世界里有一种更为简单且原始的方法,类似于 OS 中的中断( INTR,Interruption )机制。在 Worker 需要打报告的时候,就调用 Master 对外提供的中断方法即可(无需再跟从前一样修改 Master 的变量),告诉 Master :我 Worker 这有个返回结果想交给你,你先暂停一下,查收一下,确认接收后再继续你的业务循环
在我看来,这是种比较好的交互方式,做到了互不过多打扰。Worker 只知道有 Master 这个人而已,且 Master 也是。两者并不知道对方太多的讯息,更不会发生手贱修改对方成员变量的情况
问题讲完了,下面就看一看具体的代码吧
II. Solution
在经典的分布式计算模型中,一般会有个 Master 节点,它主要负责向空闲的 Worker 分配任务,而且一般是一直分配 or 直到任务队列中已无任务,简单用代码表示,
void
Master::loop()
{
while (true) {
遍历空闲 Worker 集合,挑选空闲 Worker
if(没有空闲 Worker )
continue;
/* 向编号为 id 的 Worker 发送任务 */
emit sig_dispatch(id);
}
}
这就是 Master 任务分配的逻辑,emit
是 Qt 特有的信号槽机制(引以为傲)中的内容,表示 Master 线程要发送一个信号,寻求与其他线程(对 sig_dispatch
信号感兴趣的)通信,通信的内容为变量 id
何为感兴趣?也就是 Qt 的另外一个话题了,即 Connect 机制。在此不详细展开了,Worker 在接收到 sig_dispatch
信号后,就着手开始具体的任务计算了,
void
Worker::recv_tasking(unsigned id, unsigned sec)
{
/* Master 广播的任务是发给自己的,那就接收下来,否则丢弃 */
if (id_ != id)
return;
printf("worker %d tasking...%d\n", id_, sec);
QThread::sleep(sec);
/* 完成任务后,尝试向 Master 打报告 */
while (!ma_->intr_taskdone())
;
/* Master 批准之后再向其返回结果 */
emit sig_taskdone(id, sec);
}
在这里,我用了简单的线程休眠来代替具体的业务计算(不是重点)。在 Worker 的处理流程当中,首先根据 Master 广播的 Packet 中的 id
编号,判别这个 Packet 是否是发给自己的;然后接收任务进行计算;完成任务后,下面是重点
要向 Master 打报告,Worker 线程是知道 Master 的存在的(变量 ma_
),调用 intr_taksdone()
方法告诉 Master :喂,Master ,我是 Woker x 号,你之前要的计算任务我已经完成了,你要不现在腾个手,接收一下?
为什么要用 while 循环包裹呢?是因为报告一次可能会不成功,所以要一直举手打报告,直到 Master 看见并停下手中的工作。然后再发送 sig_taskdone
信号并将结果返回。其中的方法 intr_taskdone()
是基于 Qt 线程原有机制的屑微封装,
bool
Master::intr_taskdone()
{
mathrd_->requestInterruption();
return mathrd_->isInterruptionRequested();
}
调用了 QThread::requestInterruption()
方法和 QThread::isInterruptionRequested()
。这两个方法是一对儿,前者大致功能是尝试中断正在运行的线程,对应代码中,即是请求中断线程 mathrd_
( Master 线程),并返回中断成功与否的情况
如果不用 Qt 线程自带的中断法,那代码就应该写成这样,Master 的业务逻辑不再是死循环,而是条件循环,且这个循环条件抛给 Worker (它可以修改),
void
Master::loop()
{
while (!intr) {
遍历空闲 Worker 集合,挑选空闲 Worker
if(没有空闲 Worker )
continue;
/* 向编号为 id 的 Worker 发送任务 */
emit sig_dispatch(id);
}
}
变量 intr
控制着 Master 是否要继续分配任务,当其为真时,Master 会跳出循环,这意味此时有 Worker 前来返回结果。Master 在处理完 Worker 返回的结果后,因为需要继续执行分配任务的业务逻辑,所以在接收 sig_taskdone
信号的槽函数 rec_taskdone
中将这套流程串联起来,
void
Master::recv_taskdone()
{
处理 Worker 的返回结果
intr = false;
loop();
}
Worker 应该这样写,
void
Worker::recv_tasking(unsigned id, unsigned sec)
{
/* Master 广播的任务是发给自己的,那就接收下来,否则丢弃 */
if (id_ != id)
return;
printf("worker %d tasking...%d\n", id_, sec);
QThread::sleep(sec);
ma_->intr = true;
/* Master 跳出循环后再向其返回结果 */
emit sig_taskdone(id, sec);
}
直接修改 Master 的成员变量,虽然也可以进行封装,使其不能直接触碰到 Master 的核心,但是我还是觉得这种方法与 OOP 有点格格不入,传统的 C++ 多线程确实是应该这么写,但是 Qt 线程机制有了更好的选择
而且,当有多个 Worker 的情况下,还需考虑到线程安全的问题,即是在修改条件循环变量 ma_->intr
时还要对其进行上锁,等等一系列的同步操作,真的是让人头疼
III. Evaluation
在 C++ 的世界里,如果想实现线程之间的通信,无非是通过共享内存以及条件变量来实现。这些手段未免太过暴露且繁琐,很容易出错。以至于在现代语言 Go 中采用了 Channel 通信,方便各协程之间的同/异步
Go 中的协程,可以理解为轻量级的线程,但不同于 C/C++ 中的线程。线程是 OS 调度的最小单位,多个线程在 CPU 上是可以并行的;而协程是并发的,一个进程可以有多个协程,但在一个时间点上只能执行某个协程
最直白的例子,就是并行好比三个人同时比赛吃包子,每人都有一个包子,同时啃;而并发就好比一个人同时要吃三个包子,他为了完成任务,只能包子 A 啃一会,然后啃包子 B ,再轮到包子 C ,以此往复,直到吃完三个包子
所以并行和并发乍一听,可能会觉得差不多,都是完成同一件事情。但是仔细想想,两者差别还是很大的。并行是多个核心同时在做事,而且是不同的事,属于是同时赛跑;而并发是一个核心,它轮转做着好几件事,因为轮转的速度比较快,所以人们会有一种错觉:感觉这几件事是同时发生的
都是做事,可能做的很快,并发就被认为是并行,这其实是天大的误会。并发就是并发,并发永远不能代替并行,在很多模拟的场合下,如果错误地用并发的手段代替了并行,将会得到错得离谱的结果(车辆路径问题,Vehicle Routing Problem )