你可以找到该程序here https://pastebin.com/H5fq732a
我正在消息传递框架 0MQ 中构建一个程序。我尝试执行我发布的内容here https://stackoverflow.com/questions/44096203/another-reliable-way-to-do-pull-push-sync-in-zeromq
程序编译为g++ -std=c++11 test.cpp -o test -lzmq -lpthread
.
要运行该程序,请传递一个参数作为您想要的线程号。然后将该参数分配给变量worker_num
.
在主线程中,我设置线程:
vector<thread> pool;
for(int i = 0; i < worker_num; i++)
{
cout << "main() : creating thread, " << i << endl;
pool.push_back(thread(task1, (void *)&context, i));
}
我想确保所有工作线程在主线程将作业分发给它们之前都已成功连接到主线程。
while(true)
{
if(sync_done)
{
cout << "sync done in main thread" << endl;
break;
}
zmq::message_t sync_msg(4);
memcpy((void *)sync_msg.data(), SYNC_MSG, SYNC_MSGLEN);
for(int i = 0; i < worker_num; i++)
distask_socket.send(sync_msg);
for(int i = 0; i < worker_num; i++)
{
if(sync_done)
break;
if(i != 0)
this_thread::sleep_for(chrono::milliseconds(500));
zmq::message_t res_msg;
int ret = getres_socket.recv(&res_msg, ZMQ_DONTWAIT);
if(ret == -1 && errno == EAGAIN)
continue;
int threadID = stoi(string((char *)res_msg.data()));
sync_done = if_sync_done(threadID, sync_array, worker_num);
}
}
那么主线程所做的就是:push #worker_num ofsync
每次将其 PUSH 端点发送到工作线程,然后从其 PULL 端点读取确认消息。如果主线程检索到 #worker_num 的确认消息,则同步完成。来自工作线程的同步消息的格式为:字符串中的工作线程 ID。所以线程 0 会传递一个0
在字符串中返回主线程。
但是运行我的程序:
$ ./test 1
main() : creating thread, 0
thread id:0
thread 0 receives: sync
thread 0 sends: 0
thread 0 sync done
main thread receives sync msg from thread 1 # you may get many copies of this msg
terminate called after throwing an instance of 'std::invalid_argument'
what(): stoi
Aborted
main thread receives sync msg from thread 1
意味着创建了 2 个线程:线程 0 和线程 1。知道为什么吗?我确实通过了1
作为参数。请注意,如果您自己运行该程序,您可能会得到其他输出。
UPDATE:
计划更新:here https://pastebin.com/USDVwehY.
最后我明白出了什么问题。
预期输出,您会看到线程 0 传递了 a0
到主线程通知同步完成:
$ ./test 1
input parameter is: 1
main() : creating thread, 0
thread 0 receives: sync
to_string 0
thread 0 sends: 0, with size: 1
thread 0 sync done
pass 0 to if_sync_done
main thread receives sync msg from thread 0
sync done in main thread
意外的输出,您会看到不可打印的字符被传递给stoi()
:
$ ./test 1
input parameter is: 1
main() : creating thread, 0
thread 0 receives: sync
to_string 0
thread 0 sends: 0, with size: 1
thread 0 sync done
pass to if_sync_done # !!!!!
terminate called after throwing an instance of 'std::invalid_argument'
what(): stoi
Aborted
所以看来我用message_t
错误地。所以我需要确保在主线程将内容传递给之前stoi()
,缓冲区仍然存在。
我自己添加一个答案。