Note: 这是not与这个问题 https://stackoverflow.com/questions/17583903/how-to-get-the-connection-object-of-a-specific-user它利用MessageComponentInterface
。我在用WampServerInterface
相反,所以这个问题具体涉及该部分。我需要带有代码示例和解释的答案,因为我认为这对将来的其他人有帮助。
尝试对单个用户进行循环推送
我正在使用 Ratchet 和 ZeroMQ 的 WAMP 部分,目前我有一个工作版本推送集成教程 http://socketo.me/docs/push.
我正在尝试执行以下操作:
- Zeromq 服务器已启动并正在运行,准备记录订阅者和取消订阅者
- 用户通过 websocket 协议在浏览器中连接
- A loop开始发送数据到特定用户谁要求的
- 当用户断开连接时,该用户数据的循环将停止
我有第 (1) 点和第 (2) 点工作,但我遇到的问题是第三点:
首先:如何仅向每个特定用户发送数据?广播将其发送给每个人,除非“主题”最终可能是个人用户 ID?
第二:我有一个很大的安全问题。如果我要从客户端发送哪个用户 ID 想要订阅(这似乎是我需要的),那么用户只需将变量更改为另一个用户的 ID,然后就会返回他们的数据。
第三:我必须运行一个单独的 php 脚本包含用于启动实际循环的 Zeromq 代码。我不确定这是最好的方法,我宁愿让它完全在代码库中工作,而不是单独的 php 文件。这是我需要整理的一个主要领域。
以下代码显示了我目前拥有的内容。
仅从控制台运行的服务器
我逐字输入php bin/push-server.php
运行这个。订阅和取消订阅都会输出到此终端以用于调试目的。
$loop = React\EventLoop\Factory::create();
$pusher = Pusher;
$context = new React\ZMQ\Context($loop);
$pull = $context->getSocket(ZMQ::SOCKET_PULL);
$pull->bind('tcp://127.0.0.1:5555');
$pull->on('message', array($pusher, 'onMessage'));
$webSock = new React\Socket\Server($loop);
$webSock->listen(8080, '0.0.0.0'); // Binding to 0.0.0.0 means remotes can connect
$webServer = new Ratchet\Server\IoServer(
new Ratchet\WebSocket\WsServer(
new Ratchet\Wamp\WampServer(
$pusher
)
),
$webSock
);
$loop->run();
通过 websocket 发送数据的 Pusher
我省略了无用的东西并专注于onMessage()
and onSubscribe()
方法。
public function onSubscribe(ConnectionInterface $conn, $topic)
{
$subject = $topic->getId();
$ip = $conn->remoteAddress;
if (!array_key_exists($subject, $this->subscribedTopics))
{
$this->subscribedTopics[$subject] = $topic;
}
$this->clients[] = $conn->resourceId;
echo sprintf("New Connection: %s" . PHP_EOL, $conn->remoteAddress);
}
public function onMessage($entry) {
$entryData = json_decode($entry, true);
var_dump($entryData);
if (!array_key_exists($entryData['topic'], $this->subscribedTopics)) {
return;
}
$topic = $this->subscribedTopics[$entryData['topic']];
// This sends out everything to multiple users, not what I want!!
// I can't send() to individual connections from here I don't think :S
$topic->broadcast($entryData);
}
开始循环使用上述 Pusher 代码的脚本
这是我的问题 - 这是一个单独的 php 文件,希望将来可以集成到其他代码中,但目前我不确定如何正确使用它。我是否从会话中获取用户的 ID?我仍然需要从客户端发送它......
// Thought sessions might work here but they don't work for subscription
session_start();
$userId = $_SESSION['userId'];
$loop = React\EventLoop\Factory::create();
$context = new ZMQContext();
$socket = $context->getSocket(ZMQ::SOCKET_PUSH, 'my pusher');
$socket->connect("tcp://localhost:5555");
$i = 0;
$loop->addPeriodicTimer(4, function() use ($socket, $loop, $userId, &$i) {
$entryData = array(
'topic' => 'subscriptionTopicHere',
'userId' => $userId
);
$i++;
// So it doesn't go on infinitely if run from browser
if ($i >= 3)
{
$loop->stop();
}
// Send stuff to the queue
$socket->send(json_encode($entryData));
});
最后是客户端js订阅
$(document).ready(function() {
var conn = new ab.Session(
'ws://localhost:8080'
, function() {
conn.subscribe('topicHere', function(topic, data) {
console.log(topic);
console.log(data);
});
}
, function() {
console.warn('WebSocket connection closed');
}
, {
'skipSubprotocolCheck': true
}
);
});
结论
上面的内容有效,但我确实需要弄清楚以下内容: