我按照你所说的例子进行了操作。它们对我来说似乎并不完整,但我明白你的意思。 Ratchet 是一个服务器端脚本,仅允许您编写一个实现 websockets 并能够侦听 ZMQ 消息的服务。您将在命令行上启动 Ratchet 脚本,它作为服务与 Apache 并行运行。
这一切都独立于 websocket 的客户端。正如他们推荐的那样,我在客户端使用了 Autobahn.js。该库实现了 WAMP 协议。它最大限度地简化了客户端代码。
你的代码的问题是class Pusher implements WampServerInterface
没有public function onPostEntry
。这个类必须实现WampServerInterface
,这意味着它必须至少具有以下功能:
- onSubscribe(ConnectionInterface $conn, $topic)
- onUnSubscribe(ConnectionInterface $conn, $topic)
- onOpen(ConnectionInterface $conn)
- onClose(ConnectionInterface $conn)
- onPublish(ConnectionInterface $conn, $topic, $event, 数组 $exclude, 数组 $eligible
- onError(ConnectionInterface $conn, \Exception $e)
- 关于 ZMQ 消息($json 数据)
还可以有其他更高级的功能,例如call
客户端上的远程过程。
在发送方(ZMQ 消息),输入以下代码:
$zmq = new ZMQWrapper;
$zqm->publish('posts', $response);
class ZMQWrapper {
function __construct(){
$this->context = new ZMQContext();
$this->socket = $this->context->getSocket(ZMQ::SOCKET_PUSH);
$this->socket->setSockOpt(ZMQ::SOCKOPT_LINGER, 500);
$this->socket->connect("tcp://127.0.0.1:" . ZMQ_PORT);
}
function publish($topic, $msg){
$data = ['topic' => "mb.$topic", 'msg' => $msg];
$this->socket->send(json_encode($data), ZMQ::MODE_DONTWAIT);
}
}
在推送文件中添加如下内容:
public function onSubscribe(ConnectionInterface $conn, $topic) {
$log = $this->getLogger();
$topicId = $topic->getId();
$log->info(sprintf('A client subscribed to %s', $topicId));
// you could broadcast that user x joined the discussion
}
public function onUnSubscribe(ConnectionInterface $conn, $topic) {
$log = $this->getLogger();
$topicId = $topic->getId();
$log->info(sprintf('A client unsubscribed from %s', $topicId));
// you could broadcast that user x leaved the discussion
}
public function onOpen(ConnectionInterface $conn) {
$log = $this->getLogger();
$log->info(sprintf('Client %d connected', $conn->resourceId));
$this->clients[$conn->resourceId] = array(); // this will allow you to save state information of the client, you can modify in onSubscribe and onUnsubscribe
// clients will contain the list of all clients
}
public function onClose(ConnectionInterface $conn) {
$log = $this->getLogger();
$log->info(sprintf('Client %d disconnected', $conn->resourceId));
// you could broadcast that user x leaved the discussion
}
public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) {
$log = $this->getLogger();
$topicId = $topic->getId();
$log->info(sprintf('Client %d published to %s : %s', $conn->resourceId, $topicId, json_encode($event)));
foreach($topic->getIterator() as $peer){
if(!in_array($peer->WAMP->sessionId, $exclude)){
$peer->event($topicId, $event);
}
}
}
最后一块在客户端。如果用户打开页面mysite/allposts
,在 javascript 中你包括autobahn.js
。 websocket 将在变量下可用ab
。然后你做:
打开页面时:
var currentSession;
ab.connect(
Paths.ws,
function(session) { // onconnect
currentSession = session
onWsConnect(session)
},
function(code, reason, detail) {// onhangup
onWsDisconnect(code, reason, detail)
},{
maxRetries: 60,
retryDelay: 2000,
skipSubprotocolCheck: true
}
)
currentSession.subscribe('posts', onPostReceived)
function onPostReceived(topic, message){
//display the new post
}
关闭页面时:
currentSession.unsubscribe(topic)
你注意到我把一切都保持得很笼统。这允许我在同一个系统中处理多种类型的消息。不同之处在于 ZMQ 消息和参数currentSession.subscribe
.
在我的实现中,我还跟踪打开连接的登录用户,但我剥离了这部分代码。
我希望这能帮到您。