如何将 ZeroMQ 套接字与 Ratchet Web-socket 库绑定以实现 PHP 应用程序的实时应用程序?

2024-01-05

我只是涉及 websocket、Ratchet 和 ZeroMQ 的整个领域的初学者。

以我的基本理解:

websocket有助于在服务器和客户端之间创建开放连接。

Ratchet是一个基于 PHP 的库,它使用 PHP 的核心 Socket 函数来创建 PHP 套接字框架,使我们能够轻松进行 PHP 套接字编程。

ZeroMQ是一个套接字库,可帮助非棘轮应用程序(其他 PHP 脚本)通过棘轮套接字和 Web 套接字发送数据。

我正在关注棘轮中关于“hello world”和“pusher”的教程,但它们似乎都不完整,只教如何仅使用控制台。我还在 github 中找到了ratchet-example,但没有正确记录。我正在寻找一个完整的示例(带有专用的 html 页面和 javascript)

下面是我正在处理的代码: 这是我正在发出 Ajax 请求的控制器方法之一。此方法将创建一个新帖子(可以说)。我想在 ZeroMq 的帮助下通过广播/推送来动态更新多个客户端浏览器中的帖子列表。

控制器中的方法:

public function create_new_post(){
    // ------
    // code to create a new post.
    // -------

    // After creating a post
    $response = [
        'new_post_title'    => $title,
        'post_id'           => $id
    ];

    $context = new ZMQContext();
    $socket = $context->getSocket(ZMQ::SOCKET_PUSH, 'my pusher');
    $socket->connect("tcp://localhost:8000");
    $socket->send(json_encode($response));

}

推杆文件:

use Ratchet\ConnectionInterface;
use Ratchet\Wamp\WampServerInterface;

class Pusher implements WampServerInterface{

     public function onPostEntry($data){
         // Data that were sent by ZeroMQ through create_new_post() method
         $entry_data = json_decode($data);      

         // AND AFTER THIS, I DONT HAVE CLUE OF WHAT TO DO NEXT !!             

     }
}

运行服务器的 Shell 脚本:

require dirname(__DIR__) . '/vendor/autoload.php';

$loop   = React\EventLoop\Factory::create();
$pusher = new MyApp\Pusher;

// Listen for the web server to make a ZeroMQ push after an ajax request
$context = new React\ZMQ\Context($loop);
$pull = $context->getSocket(ZMQ::SOCKET_PULL);
$pull->bind('tcp://127.0.0.1:8000'); 
$pull->on('message', array($pusher, 'onBidEntry'));

// Set up our WebSocket server for clients wanting real-time updates
$webSock = new React\Socket\Server($loop);
$webSock->listen(8080, '0.0.0.0'); 
$webServer = new Ratchet\Server\IoServer(
    new Ratchet\Http\HttpServer(
        new Ratchet\WebSocket\WsServer(
            new Ratchet\Wamp\WampServer(
                $pusher
            )
        )
    ),
    $webSock
);

$loop->run();

Shell 脚本仅告知它将在端口 8080 上提供服务,但是我如何提及我的路由。假设我只想在“mysite/allposts”页面中打开连接。另外,我必须在客户端(一个 javascript 文件)编写的脚本是什么,以及如何通过更新特定 DOM 对象的客户端接收这些新数据。


我按照你所说的例子进行了操作。它们对我来说似乎并不完整,但我明白你的意思。 Ratchet 是一个服务器端脚本,仅允许您编写一个实现 websockets 并能够侦听 ZMQ 消息的服务。您将在命令行上启动 Ratchet 脚本,它作为服务与 Apache 并行运行。

这一切都独立于 websocket 的客户端。正如他们推荐的那样,我在客户端使用了 Autobahn.js。该库实现了 WAMP 协议。它最大限度地简化了客户端代码。

你的代码的问题是class Pusher implements WampServerInterface没有public function onPostEntry。这个类必须实现WampServerInterface,这意味着它必须至少具有以下功能:

  • onSubscribe(ConnectionInterface $conn, $topic)
  • onUnSubscribe(ConnectionInterface $conn, $topic)
  • onOpen(ConnectionInterface $conn)
  • onClose(ConnectionInterface $conn)
  • onPublish(ConnectionInterface $conn, $topic, $event, 数组 $exclude, 数组 $eligible
  • onError(ConnectionInterface $conn, \Exception $e)
  • 关于 ZMQ 消息($json 数据)

还可以有其他更高级的功能,例如call客户端上的远程过程。

在发送方(ZMQ 消息),输入以下代码:

$zmq = new ZMQWrapper;
$zqm->publish('posts', $response);

class ZMQWrapper {
    function __construct(){
        $this->context = new ZMQContext();
        $this->socket = $this->context->getSocket(ZMQ::SOCKET_PUSH);
        $this->socket->setSockOpt(ZMQ::SOCKOPT_LINGER, 500);
        $this->socket->connect("tcp://127.0.0.1:" . ZMQ_PORT);
    }
    function publish($topic, $msg){
        $data = ['topic' => "mb.$topic", 'msg' => $msg];
        $this->socket->send(json_encode($data), ZMQ::MODE_DONTWAIT);
    }
}

在推送文件中添加如下内容:

public function onSubscribe(ConnectionInterface $conn, $topic) {
    $log = $this->getLogger();
    $topicId = $topic->getId();
    $log->info(sprintf('A client subscribed to %s', $topicId));
    // you could broadcast that user x joined the discussion
}
public function onUnSubscribe(ConnectionInterface $conn, $topic) {
    $log = $this->getLogger();
    $topicId = $topic->getId();
    $log->info(sprintf('A client unsubscribed from %s', $topicId));
    // you could broadcast that user x leaved the discussion
}
public function onOpen(ConnectionInterface $conn) {
    $log = $this->getLogger();
    $log->info(sprintf('Client %d connected', $conn->resourceId));
    $this->clients[$conn->resourceId] = array(); // this will allow you to save state information of the client, you can modify in onSubscribe and onUnsubscribe
    // clients will contain the list of all clients
}
public function onClose(ConnectionInterface $conn) {
    $log = $this->getLogger();
    $log->info(sprintf('Client %d disconnected', $conn->resourceId));
    // you could broadcast that user x leaved the discussion
}
public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) {
    $log = $this->getLogger();
    $topicId = $topic->getId();
    $log->info(sprintf('Client %d published to %s : %s', $conn->resourceId, $topicId, json_encode($event)));
    foreach($topic->getIterator() as $peer){
        if(!in_array($peer->WAMP->sessionId, $exclude)){
            $peer->event($topicId, $event);
        }
    }
}

最后一块在客户端。如果用户打开页面mysite/allposts,在 javascript 中你包括autobahn.js。 websocket 将在变量下可用ab。然后你做:

打开页面时:

var currentSession;
ab.connect(
    Paths.ws,
    function(session) { // onconnect
        currentSession = session
        onWsConnect(session)
    },
    function(code, reason, detail) {// onhangup
        onWsDisconnect(code, reason, detail)
    },{
        maxRetries: 60,
        retryDelay: 2000,
        skipSubprotocolCheck: true
    }
)
currentSession.subscribe('posts', onPostReceived)

function onPostReceived(topic, message){
    //display the new post
}

关闭页面时:

currentSession.unsubscribe(topic)

你注意到我把一切都保持得很笼统。这允许我在同一个系统中处理多种类型的消息。不同之处在于 ZMQ 消息和参数currentSession.subscribe.

在我的实现中,我还跟踪打开连接的登录用户,但我剥离了这部分代码。

我希望这能帮到您。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何将 ZeroMQ 套接字与 Ratchet Web-socket 库绑定以实现 PHP 应用程序的实时应用程序? 的相关文章

随机推荐