首先:您可以广播 UDP,但这不适用于已连接的客户端。那只是...UDP。
其次,该链接展示了如何在 Asio 中拥有类似条件变量(事件)的接口。这只是你问题的一小部分。您忘记了大局:您需要以某种方式了解一组开放连接:
- 例如保存会话指针的容器(
weak_ptr
)到每个连接
- 每个连接订阅一个信号槽(例如升压信号).
选项 1. 对于性能来说非常好,选项 2. 对于灵活性更好(将事件源与订阅者解耦,使得可以拥有异构订阅者,例如不来自连接)。
因为我认为选项 1. 对于线程来说更简单,对于线程来说更好。效率(例如,您可以从一个缓冲区为所有客户端提供服务而无需复制),并且您可能不需要双重解耦信号/槽,让我参考一个答案,我已经在其中展示了纯 Asio(没有 Beast)的内容:
- 如何设计 boost::asio 套接字或其包装器的正确释放
它展示了“连接池”的概念——本质上是一个线程安全的容器weak_ptr<connection>
具有一些垃圾收集逻辑的对象。
演示:Echo 服务器简介
After 谈论事情我想花时间实际演示这两种方法,这样我在说什么就完全清楚了。
首先让我们展示一个简单的、普通的异步 TCP 服务器
- 具有多个并发连接
- 每个连接的会话从客户端逐行读取,并将相同的内容回显给客户端
- 3秒后停止接受,最后一个客户端断开连接后退出
master branch on github
#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>
namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;
static bool s_verbose = false;
struct connection : std::enable_shared_from_this<connection> {
connection(ba::io_context& ioc) : _s(ioc) {}
void start() { read_loop(); }
void send(std::string msg, bool at_front = false) {
post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
if (enqueue(std::move(msg), at_front))
write_loop();
});
}
private:
void do_echo() {
std::string line;
if (getline(std::istream(&_rx), line)) {
send(std::move(line) + '\n');
}
}
bool enqueue(std::string msg, bool at_front)
{ // returns true if need to start write loop
at_front &= !_tx.empty(); // no difference
if (at_front)
_tx.insert(std::next(begin(_tx)), std::move(msg));
else
_tx.push_back(std::move(msg));
return (_tx.size() == 1);
}
bool dequeue()
{ // returns true if more messages pending after dequeue
assert(!_tx.empty());
_tx.pop_front();
return !_tx.empty();
}
void write_loop() {
ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec && dequeue()) write_loop();
});
}
void read_loop() {
ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
do_echo();
if (!ec)
read_loop();
});
}
friend struct server;
ba::streambuf _rx;
std::list<std::string> _tx;
tcp::socket _s;
};
struct server {
server(ba::io_context& ioc) : _ioc(ioc) {
_acc.bind({{}, 6767});
_acc.set_option(tcp::acceptor::reuse_address());
_acc.listen();
accept_loop();
}
void stop() {
_ioc.post([=] {
_acc.cancel();
_acc.close();
});
}
private:
void accept_loop() {
auto session = std::make_shared<connection>(_acc.get_io_context());
_acc.async_accept(session->_s, [this,session](error_code ec) {
auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;
session->start();
if (!ec)
accept_loop();
});
}
ba::io_context& _ioc;
tcp::acceptor _acc{_ioc, tcp::v4()};
};
int main(int argc, char** argv) {
s_verbose = argc>1 && argv[1] == "-v"s;
ba::io_context ioc;
server s(ioc);
std::thread th([&ioc] { ioc.run(); }); // todo exception handling
std::this_thread::sleep_for(3s);
s.stop(); // active connections will continue
th.join();
}
方法一、添加广播消息
因此,让我们添加同时发送到所有活动连接的“广播消息”。我们添加两个:
- 每个新连接时一个(表示“玩家 ## 已进入游戏”)
-
一个模拟全局“服务器事件”的事件,就像您在问题中所描述的那样)。它从 main 内部触发:
std::this_thread::sleep_for(1s);
auto n = s.broadcast("random global event broadcast\n");
std::cout << "Global event broadcast reached " << n << " active connections\n";
请注意我们如何通过向每个接受的连接注册一个弱指针并在每个连接上进行操作来做到这一点:
_acc.async_accept(session->_s, [this,session](error_code ec) {
auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;
if (!ec) {
auto n = reg_connection(session);
session->start();
accept_loop();
broadcast("player #" + std::to_string(n) + " has entered the game\n");
}
});
broadcast
也可以直接使用main
简单来说就是:
size_t broadcast(std::string const& msg) {
return for_each_active([msg](connection& c) { c.send(msg, true); });
}
using-asio-post branch on github
#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>
namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;
static bool s_verbose = false;
struct connection : std::enable_shared_from_this<connection> {
connection(ba::io_context& ioc) : _s(ioc) {}
void start() { read_loop(); }
void send(std::string msg, bool at_front = false) {
post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
if (enqueue(std::move(msg), at_front))
write_loop();
});
}
private:
void do_echo() {
std::string line;
if (getline(std::istream(&_rx), line)) {
send(std::move(line) + '\n');
}
}
bool enqueue(std::string msg, bool at_front)
{ // returns true if need to start write loop
at_front &= !_tx.empty(); // no difference
if (at_front)
_tx.insert(std::next(begin(_tx)), std::move(msg));
else
_tx.push_back(std::move(msg));
return (_tx.size() == 1);
}
bool dequeue()
{ // returns true if more messages pending after dequeue
assert(!_tx.empty());
_tx.pop_front();
return !_tx.empty();
}
void write_loop() {
ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec && dequeue()) write_loop();
});
}
void read_loop() {
ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
do_echo();
if (!ec)
read_loop();
});
}
friend struct server;
ba::streambuf _rx;
std::list<std::string> _tx;
tcp::socket _s;
};
struct server {
server(ba::io_context& ioc) : _ioc(ioc) {
_acc.bind({{}, 6767});
_acc.set_option(tcp::acceptor::reuse_address());
_acc.listen();
accept_loop();
}
void stop() {
_ioc.post([=] {
_acc.cancel();
_acc.close();
});
}
size_t broadcast(std::string const& msg) {
return for_each_active([msg](connection& c) { c.send(msg, true); });
}
private:
using connptr = std::shared_ptr<connection>;
using weakptr = std::weak_ptr<connection>;
std::mutex _mx;
std::vector<weakptr> _registered;
size_t reg_connection(weakptr wp) {
std::lock_guard<std::mutex> lk(_mx);
_registered.push_back(wp);
return _registered.size();
}
template <typename F>
size_t for_each_active(F f) {
std::vector<connptr> active;
{
std::lock_guard<std::mutex> lk(_mx);
for (auto& w : _registered)
if (auto c = w.lock())
active.push_back(c);
}
for (auto& c : active) {
std::cout << "(running action for " << c->_s.remote_endpoint() << ")" << std::endl;
f(*c);
}
return active.size();
}
void accept_loop() {
auto session = std::make_shared<connection>(_acc.get_io_context());
_acc.async_accept(session->_s, [this,session](error_code ec) {
auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;
if (!ec) {
auto n = reg_connection(session);
session->start();
accept_loop();
broadcast("player #" + std::to_string(n) + " has entered the game\n");
}
});
}
ba::io_context& _ioc;
tcp::acceptor _acc{_ioc, tcp::v4()};
};
int main(int argc, char** argv) {
s_verbose = argc>1 && argv[1] == "-v"s;
ba::io_context ioc;
server s(ioc);
std::thread th([&ioc] { ioc.run(); }); // todo exception handling
std::this_thread::sleep_for(1s);
auto n = s.broadcast("random global event broadcast\n");
std::cout << "Global event broadcast reached " << n << " active connections\n";
std::this_thread::sleep_for(2s);
s.stop(); // active connections will continue
th.join();
}
方法 2:那些带有增强信号的广播2
信号方法是一个很好的例子依赖倒置.
最显着的注释:
- 信号槽在调用它的线程上被调用(“引发事件”)
- the
scoped_connection
有这样的订阅吗*自动地删除时connection
被破坏
- there's 控制台消息的措辞存在细微差别从“达到 # 个活跃连接”到“达到 # 个活跃连接”订户".
差异是理解增加的灵活性的关键:信号所有者/调用者对订阅者一无所知。这就是我们所说的解耦/依赖倒置
using-signals2 branch on github
#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>
#include <boost/signals2.hpp>
namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;
static bool s_verbose = false;
struct connection : std::enable_shared_from_this<connection> {
connection(ba::io_context& ioc) : _s(ioc) {}
void start() { read_loop(); }
void send(std::string msg, bool at_front = false) {
post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
if (enqueue(std::move(msg), at_front))
write_loop();
});
}
private:
void do_echo() {
std::string line;
if (getline(std::istream(&_rx), line)) {
send(std::move(line) + '\n');
}
}
bool enqueue(std::string msg, bool at_front)
{ // returns true if need to start write loop
at_front &= !_tx.empty(); // no difference
if (at_front)
_tx.insert(std::next(begin(_tx)), std::move(msg));
else
_tx.push_back(std::move(msg));
return (_tx.size() == 1);
}
bool dequeue()
{ // returns true if more messages pending after dequeue
assert(!_tx.empty());
_tx.pop_front();
return !_tx.empty();
}
void write_loop() {
ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec && dequeue()) write_loop();
});
}
void read_loop() {
ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
do_echo();
if (!ec)
read_loop();
});
}
friend struct server;
ba::streambuf _rx;
std::list<std::string> _tx;
tcp::socket _s;
boost::signals2::scoped_connection _subscription;
};
struct server {
server(ba::io_context& ioc) : _ioc(ioc) {
_acc.bind({{}, 6767});
_acc.set_option(tcp::acceptor::reuse_address());
_acc.listen();
accept_loop();
}
void stop() {
_ioc.post([=] {
_acc.cancel();
_acc.close();
});
}
size_t broadcast(std::string const& msg) {
_broadcast_event(msg);
return _broadcast_event.num_slots();
}
private:
boost::signals2::signal<void(std::string const& msg)> _broadcast_event;
size_t reg_connection(connection& c) {
c._subscription = _broadcast_event.connect(
[&c](std::string msg){ c.send(msg, true); }
);
return _broadcast_event.num_slots();
}
void accept_loop() {
auto session = std::make_shared<connection>(_acc.get_io_context());
_acc.async_accept(session->_s, [this,session](error_code ec) {
auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;
if (!ec) {
auto n = reg_connection(*session);
session->start();
accept_loop();
broadcast("player #" + std::to_string(n) + " has entered the game\n");
}
});
}
ba::io_context& _ioc;
tcp::acceptor _acc{_ioc, tcp::v4()};
};
int main(int argc, char** argv) {
s_verbose = argc>1 && argv[1] == "-v"s;
ba::io_context ioc;
server s(ioc);
std::thread th([&ioc] { ioc.run(); }); // todo exception handling
std::this_thread::sleep_for(1s);
auto n = s.broadcast("random global event broadcast\n");
std::cout << "Global event broadcast reached " << n << " active subscribers\n";
std::this_thread::sleep_for(2s);
s.stop(); // active connections will continue
th.join();
}
See the diff between Approach 1. and 2.: Compare View on github
针对 3 个并发客户端运行时的输出示例:
(for a in {1..3}; do netcat localhost 6767 < /etc/dictionaries-common/words > echoed.$a& sleep .1; done; time wait)