基于Redis的MessageQueue队列封装

2023-05-16

原创文章,转载请注明出处:http://www.huyanping.cn/?p=275
作者:Jenner

Redis的链表List可以用来做链表,高并发的特性非常适合做分布式的并行消息传递。

项目地址:https://github.com/huyanping/Zebra-PHP-Framework

左进右出

$redis->lPush($key, $value);
$redis->rPop($key);

 以下程序已在生产环境中正式使用。

基于Redis的PHP消息队列封装

<?php
/**
 * Created by PhpStorm.
 * User: huyanping
 * Date: 14-8-19
 * Time: 下午12:10
 *
 * 基于Redis的消息队列封装
 */
namespace Zebra\MessageQueue;

class RedisMessageQueue implements IMessageQueue
{

    protected $redis_server;

    protected $server;

    protected $port;

    /**
     * @var 消息队列标志
     */
    protected $key;

    /**
     * 构造队列,创建redis链接
     * @param $server_config
     * @param $key
     * @param bool $p_connect
     */
    public function __construct($server_config = array('IP' => '127.0.0.1', 'PORT' => '6379'), $key = 'redis_message_queue', $p_connect = false)
    {
        if (empty($key))
            throw new \Exception('message queue key can not be empty');

        $this->server = $server_config['IP'];
        $this->port = $server_config['PORT'];
        $this->key = $key;

        $this->check_environment();
        if ($p_connect) {
            $this->pconnect();
        } else {
            $this->connect();
        }
    }

    /**
     * 析构函数,关闭redis链接,使用长连接时,最好主动调用关闭
     */
    public function __destruct()
    {
        $this->close();
    }

    /**
     * 短连接
     */
    private function connect()
    {
        $this->redis_server = new \Redis();
        $this->redis_server->connect($this->server, $this->port);
    }

    /**
     * 长连接
     */
    public function pconnect()
    {
        $this->redis_server = new \Redis();
        $this->redis_server->pconnect($this->server, $this->port);
    }

    /**
     * 关闭链接
     */
    public function close()
    {
        $this->redis_server->close();
    }

    /**
     * 向队列插入一条信息
     * @param $message
     * @return mixed
     */
    public function put($message)
    {
        return $this->redis_server->lPush($this->key, $message);
    }

    /**
     * 向队列中插入一串信息
     * @param $message
     * @return mixed
     */
    public function puts(){
        $params = func_get_args();
        $message_array = array_merge(array($this->key), $params);
        return call_user_func_array(array($this->redis_server, 'lPush'), $message_array);
    }

    /**
     * 从队列顶部获取一条记录
     * @return mixed
     */
    public function get()
    {
        return $this->redis_server->lPop($this->key);
    }

    /**
     * 选择数据库,可以用于区分不同队列
     * @param $database
     */
    public function select($database)
    {
        $this->redis_server->select($database);
    }

    /**
     * 获得队列状态,即目前队列中的消息数量
     * @return mixed
     */
    public function size()
    {
        return $this->redis_server->lSize($this->key);
    }

    /**
     * 获取某一位置的值,不会删除该位置的值
     * @param $pos
     * @return mixed
     */
    public function view($pos)
    {
        return $this->redis_server->lGet($this->key, $pos);
    }

    /**
     * 检查Redis扩展
     * @throws Exception
     */
    protected function check_environment()
    {
        if (!\extension_loaded('redis')) {
            throw new \Exception('Redis extension not loaded');
        }
    }
}

如果需要一次写入多个队列,可以使用如下调用方式:

<?php
$redis = new RedisMessageQueue();
$redis->puts(1, 2, 3, 4);
$redis->puts(5, 6, 7, 8, 9);

模仿HTTPSQS输出结果的封装如下,提供了写入位置和读取位置记录的功能:

<?php
/**
 * Created by PhpStorm.
 * User: huyanping
 * Date: 14-9-5
 * Time: 下午2:16
 *
 * 附加了队列状态信息的RedisMessageQueue
 */

namespace Zebra\MessageQueue;

class RedisMessageQueueStatus extends RedisMessageQueue {

    protected $record_status;

    protected $put_position;

    protected $get_position;

    public function __construct(
        $server_config = array('IP' => '127.0.0.1', 'PORT' => '6379'),
        $key = 'redis_message_queue',
        $p_connect = false,
        $record_status=true
    ){
        parent::__construct($server_config, $key, $p_connect);
        $this->record_status = $record_status;
        $this->put_position = $this->key . '_put_position';
        $this->get_position = $this->key . '_get_position';
    }

    public function get(){
        if($queue = parent::get()){
            $incr_result = $this->redis_server->incr($this->get_position);
            if(!$incr_result) throw new \Exception('can not mark get position,please check the redis server');
            return $queue;
        }else{
            return false;
        }
    }

    public function put($message){
        if(parent::put($message)){
            $incr_result = $this->redis_server->incr($this->put_position);
            if(!$incr_result) throw new \Exception('can not mark put position,please check the redis server');
            return true;
        }else{
            return false;
        }
    }

    public function puts_status(){
        $message_array = func_get_args();
        $result = call_user_func_array(array($this, 'puts'), $message_array);
        if($result){
            $this->redis_server->incrBy($this->put_position, count($message_array));
            return true;
        }
        return false;
    }

    public function size(){
        return $this->redis_server->lSize($this->key);
    }

    public function status(){
        $status['put_position'] = ($put_position = $this->redis_server->get($this->put_position)) ? $put_position : 0;
        $status['get_position'] = ($get_position = $this->redis_server->get($this->get_position)) ? $get_position : 0;
        $status['unread_queue'] = $this->size();
        $status['queue_name'] = $this->key;
        $status['server'] = $this->server;
        $status['port'] = $this->port;

        return $status;
    }

    public function status_normal(){
        $status = $this->status();
        $message  = 'Redis Message Queue' . PHP_EOL;
        $message .= '-------------------' . PHP_EOL;
        $message .= 'Message queue name:' . $status['queue_name'] . PHP_EOL;
        $message .= 'Put position of queue:' . $status['put_position'] . PHP_EOL;
        $message .= 'Get position of queue:' . $status['get_position'] . PHP_EOL;
        $message .= 'Number of unread queue:' . $status['unread_queue'] . PHP_EOL;

        return $message;
    }

    public function status_json(){
        return \json_encode($this->status());
    }
}

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

基于Redis的MessageQueue队列封装 的相关文章

  • Node.js 上通过套接字连接 Redis

    由于共享托管 目标主机上的我的 redis 服务器不在端口上运行 而是在非常特定的套接字上运行 可以通过套接字文件连接到该套接字 只有我的用户可以访问 但是 我还没有找到如何通过套接字指定连接node redis and connect r
  • 我的 Redis 自动生成的密钥

    我不知道我的 Redis 版本 4 0 9 到底发生了什么 我正在运行一个应用程序并使用 Redis 来存储我的数据库 但是 然后 Redis 自动创建 3 个新键 Backup1 Backup2 Backup3 并删除我的所有数据 这是我
  • 仅当尚未设置时才进行原子设置

    仅当尚未在 Redis 中设置时 是否有办法执行原子设置 具体来说 我正在创建一个像 myapp user user email 这样的用户 并且希望 Redis 在 user email 已被占用时返回错误 而不是默默地替换旧值 比如声明
  • 如何在多个Lua State(多线程)之间传递数据?

    我在中启动Redis连接池redis lua 通过从 C 调用 我得到了redis lua state 此 Lua 状态全局启动一次 仅在其他线程中启动get从中 当有一个 HTTP 请求 工作线程 时 我需要从redis lua stat
  • Redis键空间事件不触发

    我有两个 Redis 客户端 在一个文件中我有一个简单的脚本设置并删除了 Redis 键 var redis require redis var client redis createClient 6379 127 0 0 1 client
  • 如何设置 Celery 以通过 ssl 与 Azure Redis 实例对话

    使用 的伟大答案 如何在microsoft azure上的django项目中配置celery redis https stackoverflow com questions 39616701 how to configure celery
  • WSL Redis 遇到系统尚未使用 systemd 作为 init 系统(PID 1)启动。无法操作[已关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我正在尝试遵循本文中讨论的 Redis 安装过程article https www digitalocean com community
  • msgget() 和 mq_open 之间的区别

    我阅读了有关消息队列操作的信息 例如 msgget msgsnd 和 msgrcv 但是当我在堆栈溢出上搜索消息队列相关问题时 我发现还有另一组消息队列操作 例如 mq open mq send mq receive 谁能告诉我这两种类型的
  • Spring Data Redis JedisConnectionException:流意外结束

    雷迪斯3 0 5Spring数据Redis 1 3 6绝地武士2 6 3 我们的 Web 应用程序通过 pub sub 从 Redis 接收数据 还以键 值对的形式在 Redis 上执行数据读 写 读 写发生在监听线程 独立监控线程和htt
  • redis 阻塞直到 key 存在

    我是 Redis 新手 想知道是否有办法能够await get通过它的键来获取值 直到该键存在 最小代码 async def handler data await self fetch key async def fetch key ret
  • 在 PHP 中使用消息队列与普通 Cron 作业之间的区别

    我们有一个基于 PHP 构建的大型 Web 应用程序 该应用程序允许安排推文和墙贴 并且有从服务器发出的预定电子邮件 我所说的 计划 是指这些 PHP 脚本计划在特定时间运行cron 大约有 7 个 PHP 文件可以完成上述工作 我一直听说
  • 如何测试我的 Redis 缓存是否正常工作?

    我已经安装了 django redis cache 和 redis py 我遵循了 Django 的缓存文档 据我所知 以下设置就是我所需要的 但我如何判断它是否正常工作 设置 py CACHES default BACKEND redis
  • 在 aws-elasticache 上使用 memcached 或 Redis

    我正在 AWS 上开发一个应用程序 并使用 AWS elasticache 进行缓存 我对使用 memcached 或 redis 感到困惑 我阅读了有关 redis 3 0 2 更新以及它现在如何等同于 memchached 的文章 ht
  • SignalR 无法连接到 SSL 上的 Azure Redis

    我目前在 Azure 上托管我的 redis 缓存服务器 并让 signalR 依赖它作为骨干 使用以下内容 GlobalHost DependencyResolver UseRedis 服务器 端口 密码 eventKey 这可以在端口
  • Redis Docker compose无法处理RDB格式版本10

    我无法在 docker compose 文件中启动 redis 容器 我知道docker compose文件没问题 因为我的同事可以成功启动项目 我读到有一个删除 dump rdb 文件的解决方案 但我找不到它 我使用Windows机器 任
  • 如何在Redis中从hmset()切换到hset()?

    我收到弃用警告 即 Redis hmset 已弃用 请改用 Redis hset 但是 hset 采用第三个参数 我不知道是什么name应该是 info users 10 timestamp datetime utcnow strftime
  • 使用 Sentinels 升级 Redis 的最佳实践?

    我有 3 个 Redis 节点 由 3 个哨兵监视 我进行了搜索 文档似乎不清楚如何最好地升级此类配置 我目前使用的是 3 0 6 版本 我想升级到最新的 5 0 5 我对这方面的程序有几个疑问 升级两个大版本可以吗 我在我们的暂存环境中执
  • 如何使redis中的“HSET”子键“过期”?

    我需要使 Redis 哈希中所有超过 1 个月的密钥过期 这不可能 https github com antirez redis issues 167 issuecomment 2559040 为了保持 Redis 简单 https git
  • Scala 使用的 Redis 客户端库建议

    我正在计划使用 Scala 中的 Redis 实例进行一些工作 并正在寻找有关使用哪些客户端库的建议 理想情况下 如果存在一个好的库 我希望有一个为 Scala 而不是 Java 设计的库 但如果现在这是更好的方法 那么仅使用 Java 客
  • 如何将“.csv”数据文件导入Redis数据库

    如何将 csv 数据文件导入 Redis 数据库 csv 文件中包含 id 时间 纬度 经度 列 您能否向我建议导入 CSV 文件并能够执行空间查询的最佳方法 这是一个非常广泛的问题 因为我们不知道您想要什么数据结构 您期望什么查询等等 为

随机推荐