异步输出日志
平时开发过程中总是不可避免会用到生产者-消费者模型来实现一些具体的功能。
比如在应用程序中,我们希望在关键的代码附近输出一些日志(到文件),以备程序运行出现bug时尽可能地知道更多的运行时信息,有利于bug分析和故障解决。但是,我们不能为了输出日志,而影响了程序的性能,例如我们不能想当然地直接在目标代码附近直接写上一段"将日志内容保存到磁盘文件"这样的代码,虽然它确实可以完成日志输出的功能,但是我们要知道将内存中的数据保存到文件属于磁盘I/O,如果当前保存的日志数据量很大,那就可能非常耗时,这无疑会对程序本身的业务性能造成影响,这也不是我们希望看到的。
这种情况下就可以利用一个简单的生产者-消费者模型(Producer-Consumer Model)来实现一个异步的日志组件。我们可以针对日志这种功能进行简单的分析:
日志消息无非就是一些字符串,输出日志无非就是将一个字符串保存到文件。既然不能在目标代码附近当场保存文件,那就可以引入一个队列,我们把需要保存的日志消息先塞到这个队列,另外引入一个线程,它负责不断地从队列中取出待保存的日志消息,并保存到文件中
你也许会说"又是队列又是线程的,不还是要做磁盘I/O,花里胡哨的有用吗?"
有用!因为塞一条字符串到队列尾部的过程是在内存中完成的,这远比将这条字符串保存到磁盘快得多,我们只需要在希望输出日志的地方向队列提交日志消息即可,保存日志消息到文件的任务已经被赶到一个单独的线程中来执行,所有提交的日志消息迟早会被保存到磁盘,但应用程序业务不再受磁盘I/O速度的影响,这多是一件美事啊。
具体的思路是这样:
实现一个日志类(组件类),它的每个实例维护一个日志消息队列和一个日志消息处理线程,线程不断地从队列中取出日志消息并保存到文件中。这个日志类至少提供以下接口:
/**
* 描述: 提交一条日志消息到队列中
* 访问: public
* 返回: bool 提交成功返回true,否则返回false
* @ const char * lpszLogMsg: 待提交的日志消息字符串
*/
bool CommitLogMsg(const char* lpszLogMsg);
该接口负责将指定的日志消息塞到队列(队尾),当然,这要求日志处理线程已经创建,因此我们另外提供以下两个接口以便于控制线程的启动和停止:
/**
* 描述: 启动组件
* 访问: public
* 返回: bool 启动成功返回true,否则返回false
* @ const char * lpszLogFilePath: 日志保存的文件路径
*/
bool Start(const char* lpszLogFilePath);
/**
* 描述: 停止组件
* 访问: public
*/
void Stop();
Start()接口实际上是创建一个日志处理线程,不断地检查队列中是否有新的日志消息到来,并取出它们进行保存;
Stop()接口负责优雅地停止线程。
只要保证在提交日志(CommitLogMsg)之前日志组件已经启动(Start)就可以了,这样就可以通过日志组件实现异步输出日志到文件了。具体的代码如下:
/*!
* \文件 MyLogger.h
* \作者 huge
* \日期 2022/06/28
* \描述 日志组件类源码,实现了一个简单的异步日志输出的组件
*/
#ifndef MYLOGGER_H
#define MYLOGGER_H
#include <string>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <fstream>
namespace logger
{
class MyLogger
{
public:
/**
* 描述: 启动组件
* 访问: public
* 返回: bool 启动成功返回true,否则返回false
* @ const char * lpszLogFilePath: 日志保存的文件路径
*/
bool Start(const char* lpszLogFilePath)
{
if (m_bStop)
{
m_strLogFilePath = lpszLogFilePath;
{
std::unique_lock<std::mutex> lock(m_mtxData);
m_bStop = false;
}
//创建一个日志处理线程
m_thSaveLog = std::thread(&MyLogger::fnSaveLog, this);
}
return true;
}
/**
* 描述: 停止组件
* 访问: public
*/
void Stop()
{
if (!m_bStop)
{
{
std::unique_lock<std::mutex> lock(m_mtxData);
m_bStop = true;
}
// 唤醒所有日志处理线程(当然本例只有一个线程),使它们安全的终止
m_cvNewLogOrStoped.notify_all();
if (m_thSaveLog.joinable())
m_thSaveLog.join();
}
}
/**
* 描述: 提交一条日志消息到队列中
* 访问: public
* 返回: bool 提交成功返回true,否则返回false
* @ const char * lpszLogMsg: 待提交的日志消息字符串
*/
bool CommitLogMsg(const char* lpszLogMsg)
{
if (m_bStop || !lpszLogMsg)
return false;
// 安全地将日志消息塞入队列尾部
{
std::unique_lock<std::mutex> lock(m_mtxData);
m_LogMsgQue.push(lpszLogMsg);
}
//唤醒日志处理线程
m_cvNewLogOrStoped.notify_one();
return true;
}
protected:
/**
* 描述: 日志保存线程函数
* 访问: protected
* 返回: void
*/
void fnSaveLog()
{
//以追加方式打开日志文件(这里省略了创建日志文件的过程,不必在意这些细节)
std::ofstream ofs(m_strLogFilePath, std::ios_base::out | std::ios_base::app);
//进入日志处理循环
while (true)
{
std::string log_msg; // 保存即将取出的单条日志消息
{
// 检查组件是否被停止,或有新的日志消息被提交,在此条件上等待被唤醒
std::unique_lock<std::mutex> lock(m_mtxData);
m_cvNewLogOrStoped.wait(lock, [this] {return this->m_bStop || !this->m_LogMsgQue.empty(); });
// 若组件被停止,且队列中不再有新的日志消息,则退出循环
if (m_bStop && m_LogMsgQue.empty())
break;
// 取出一条日志消息
log_msg = std::move(m_LogMsgQue.front());
m_LogMsgQue.pop();
}
//在此完成 将log_msg保存到文件 的处理
ofs << log_msg << std::endl;
}
// 确保日志文件被关闭
if (ofs.is_open())
ofs.close();
}
private:
bool m_bStop = true; // 组件是否被停止的标志,true表示已停止,false表示已启动
std::string m_strLogFilePath; // 日志被保存的文件路径
std::queue<std::string> m_LogMsgQue; // 日志消息队列
std::mutex m_mtxData; // 互斥量,保护队列的线程安全
std::condition_variable m_cvNewLogOrStoped; // 条件变量,检查是否有新的日志消息提交到队列,或组件被停止
std::thread m_thSaveLog; // 日志处理线程对象
};
}
#endif // !MYLOGGER_H
可以看到,为了实现这样一个组件,类内部至少需要维护一个队列和一个常驻线程,而且需要保证入列出列操作的线程安全。
这里队列我采用了STL中的std::queue<T>,并通过C++11中的线程(std::thread)、互斥量(std::mutex)、条件变量(std::condition_variable)的组合拳法来控制线程的启停与线程数据安全。当然,实现一个正经的异步日志组件需要面临和解决的问题远不止此,还有很多值得注意的细节并未体现,比如:
- Start()、Stop()本身并未保证线程安全;
- 日志处理线程中并未检查文件路径的有效性;
- 日志内容可以在保存之前进行压缩以节省磁盘空间;
- 使用无锁队列来替换本例中的队列以避免加锁造成的性能损失;
- ......
挑刺永远是挑不完的,但是没关系,这些问题并非本文探讨的重点。
使用该组件的流程:
- 创建一个MyLogger组件实例(logger);
- 在业务初始化阶段启动组件(logger.Start(...));
- 在需要输出日志的任何地方提交日志(logger.CommitLogMsg(...));
- 在业务终止阶段停止组件(logger.Stop())。
我们来看一下这个日志类的测试代码:
/*!
* \文件 main.cpp
* \作者 huge
* \日期 2022/06/28
* \描述 MyLogger组件测试代码,包含主函数
*/
#include "MyLogger.h"
using namespace logger;
int main()
{
MyLogger logger; //创建一个日志组件的实例
logger.Start("test.log"); //启动组件,将日志保存到"test.log"文件中
for (size_t i = 0; i < 100; i++)
{
//连续提交若干条日志
logger.CommitLogMsg(std::to_string(i + 1).data());
}
logger.Stop(); //停止组件
return 0;
}
可以看到用起来还是蛮简单和方便的。值得注意的是,当组件的使用者调用了Stop时,日志处理线程不见得会立即终止,因为仅当组件被停止且队列为空了线程才会退出循环而终止。
当然你完全可以选择将Start和Stop的实现分别移到构造函数和析构函数中,但是一般情况下不会这样做,原因是在大多数情况下,我们初始化这样的组件可能需要一些额外的工作,完成这些额外工作所需要的信息可能无法在构造时获取(因为构造可能太早了)。打个比方,我们可能在启动组件时需要根据当前应用程序所在的路径来确定日志文件保存路径,然而这个工作在构造函数中很有可能无法完成,因为构造时可能应用程序还没完成初始化,无法得到它所在的路径,因此不得不将此工作放到构造之后,提供专门的接口,由组件的使用者来调用接口完成。
测试代码仅在主线程中提交了若干条日志,实际上这个组件允许在多个线程中并发地向同一个组件实例提交日志。
经过以上问题的分析和解决,我们可以发现一个典型的模型,并将它广泛应用于其他类似的场景。
模型的理解,生产者-消费者模型的特性
这种通过队列作为桥梁,一端进行数据生产,另一端进行数据消费的场景在实际开发中屡见不鲜:
- 耗时的数据查询 - 提交查询请求就是生产,根据请求参数进行查询就是消费;
- 密集的消息推送:消息发布出去就是生产,接收并处理消息就是消费;
- 实时数据转存为历史数据:实时数据的更新和提交就是生产,取实时数据保存到磁盘就是消费;
- 线程池:提交待执行任务就是生产,取出任务并执行就是消费。
以上都是生产者-消费者模式的典型应用场景,所谓生产者就是产生数据的一方,消费者就是处理数据的一方,这些场景都有一些共同点:
- 生产操作是随时随处可能发生的,无法确定时机和频率的;
- 消费过程不是CPU密集的,但可能是比较耗时的,缓慢的;
- 消费过程是独占数据和相关资源的;
- 消费操作是不必立即知道结果的;
这些共同点正是生产者-消费者模型的特点,当我们面临某个场景,刚好具备这些特点时,我们完全可以考虑应用它。
优点:
- 解耦:一些场景下,我们甚至可以单独抽象出生产者类、消费者类、数据容器类,生产者类和消费者类不直接相互依赖,他们都依赖于数据容器类,这使得生产方和消费方得以解耦,两方可以独立发展、扩展升级。
- 支持并发:得益于缓冲队列的存在,我们可以在保证线程安全的前提下,并发地向队列提交数据,在任何线程进行生产操作;
- 支持闲忙不均:生产和消费解耦,那就可以你生产你的,我消费我的,慢速的消费不会影响快速的生产。
模型的抽象,如何实现一个通用的消费者组件
纵观这些经典的应用场景,我们好像可以从中抽象出一个稳定的框架,细心的你会发现,无论什么场景什么样的功能,实现它们的过程中我们都在重复干着这样一些事情:
- 我们总是需要向队列提交一些数据,不论数据是什么样的,生产操作总是将数据塞进队列;
- 我们总是要从队列中取出数据并处理它们,无论数据是什么样的;
这些是不变的,那么变化的是什么呢?
我们总是要生产和消费,仅仅是生产消费的数据类型以及对他们的消费处理实现不一样
既然如此,我们完全可以抽象出一个生产者-消费者组件类模板(Class Template of Producer-Consumer Component),它至少具有以下行为:
- 启动组件 - 创建一个队列和一些消费者线程,它们时刻准备着从队列中安全地取出数据进行消费
- 停止组件 - 优雅地结束消费者线程,释放一些相关资源
- 生产提交 - 向队列提交一条数据
这些行为是稳定的,是所有支持生产者-消费者模型的组件都应当具备的接口。那么问题来了,数据类型和数据处理的实现是变化的,这应当如何是好呢?
数据类型好想办法,反正生产和消费操作的数据一定是相同的类型,因此我们可以把数据类型作为类模板的参数,队列中保存的数据的类型就是这个模板参数指示的类型,具体的类型交由使用者来决定。
关于数据处理,有一种办法叫做回调(Callback),我们把不能当场确定的处理过程交给回调函数,让回调函数的实现者来完成具体的处理。在生产者-消费者模型中,我们可以把消费操作变成一个回调函数,将待消费的数据作为回调函数的入参,我们只需要在消费的时候调用回调函数即可完成消费操作。
理清楚了这些,我们应当可以开始着手开展组件类模板的具体实现了,让我们大干一场(直接参考上面的日志组件,稍微修改一下):
#ifndef CONSUMER_H
#define CONSUMER_H
#include <thread>
#include <mutex>
#include <queue>
#include <functional>
#include <condition_variable>
/**
* \名称 csm
* \作者 huge
* \日期 2022/06/28
* \描述 消费者组件类模板的命名空间
*/
namespace csm
{
/**
* \类名 Consumer<T>
* \作者 huge
* \日期 2022/06/28
* \描述 消费者组件类模板(带单个消费线程)
*/
template<typename T>
class Consumer
{
public:
/**
* 描述: 启动组件
* 访问: public
* 返回: bool 启动成功返回true,否则返回false
*/
bool Start()
{
//创建单个消费者线程
if (m_bStoped)
{
m_bStoped = false;
m_thConsumer = std::thread(&Consumer<T>::fnConsumer, this);
}
return true;
}
/**
* 描述: 停止组件
* 访问: public
* 返回: void
*/
void Stop()
{
if (!m_bStoped)
{
//将m_bStoped置true后唤醒所有消费者线程,使其安全结束
{
std::unique_lock<std::mutex> lock(m_mtxData);
m_bStoped = true;
}
m_cvNewDataOrStoped.notify_all(); //唤醒所有等待在此条件上的线程,让它们优雅地退出
if (m_thConsumer.joinable())
m_thConsumer.join(); //等待消费者线程结束
}
}
/**
* 描述: 提交一条数据
* 访问: public
* 返回: bool 提交成功返回true,否则返回false
* @ const T & data: 待消费的数据
* 备注: 若组件未启动,则提交会失败返回false
*/
bool Commit(const T& data)
{
if (m_bStoped)
return false;
{
std::unique_lock<std::mutex> lock(m_mtxData);
m_DataQue.push(data);
}
m_cvNewDataOrStoped.notify_one();//唤醒某个消费线程进行消费
}
using Consumer_Callback_t = void(Consumer<T>*, const T&); //消费操作回调原型
/**
* 描述: 设置消费操作回调处理函数
* 访问: public
* 返回: void
* @ Consumer_Callback_t callback: 回调函数实例
* 备注: 实参可以传入函数指针,lambda等任何可调用对象
*/
void SetComsumerCallback(Consumer_Callback_t callback)
{
m_ConsumerCallback = callback;
}
protected:
void fnConsumer()
{
//线程即将进入消费循环,在此初始化资源
while (true)
{
T data; //用于缓存最新取出的单条数据
{
//检查队列中是否仍有待消费的数据,或者外部停止了组件
std::unique_lock<std::mutex> lock(m_mtxData);
m_cvNewDataOrStoped.wait(lock, [this] {return m_bStoped || !m_DataQue.empty(); });
//若外部停止了组件,且队列中没有待消费的数据,可以退出线程
if (m_bStoped && m_DataQue.empty())
break;
//否则一定是队列中还有待消费数据,取出一条消费之
data = std::move(m_DataQue.front());
m_DataQue.pop();
}
//数据消费处理(回调,由组件的使用者者来实现具体的消费处理)
if (m_ConsumerCallback)
m_ConsumerCallback(this, data);
}
}
private:
bool m_bStoped = true; //标识组件是否已经停止运行,true表示已停止,false表示已启动
std::mutex m_mtxData; //数据互斥量
std::condition_variable m_cvNewDataOrStoped; //条件变量,检查是否有新数据提交或组件被停止
std::queue<T> m_DataQue; //数据队列
std::thread m_thConsumer; //消费者线程对象
std::function<Consumer_Callback_t> m_ConsumerCallback; //单次消费操作的回调函数
};
}
可以看出这份实现与与上面的日志类(MyLogger)非常相似,只是发生了以下改变:
- 类名由MyLogger换成了Consumer;
- 类变成了变成了类模板;
- 数据类型由std::string变成了类模板参数T;
- 消费处理过程变成了一个回调,在类中新增一个保存回调函数的成员m_ConsumerCallback(实际上是个std::function对象),并提供一个新的接口,以便组件的使用者可以指定具体的回调函数;
其它的变化几乎不值一提。我们来看一下这样的组件用起来什么效果:
/*!
* \文件 main.cpp
* \作者 huge
* \日期 2022/06/28
* \描述 Consumer<T>组件测试代码,包含主函数
*/
#include "Consumer.h"
using namespace csm;
int main()
{
using data_t = int; // 生产消费操作的数据类型,这里指定为int
Consumer<data_t> obj; // 创建一个面向int类型的消费者组件实例
//设置自定义的消费处理实现,这里是lambda形式,消费操作仅仅是在控制台打印一下数据内容
obj.SetComsumerCallback([](Consumer<data_t>* pSender, const data_t & data) {std::cout << "Consume:[" << data << "]." << std::endl; });
obj.Start(); //启动组件
for (int i = 0; i < 10; i++)
{
obj.Commit(i + 1); //向组件提交若干条数据
}
obj.Stop(); //停止组件
return 0;
}
以下是测试代码运行结果截图:
可以看出来使用挺方便,没啥大毛病,测试程序在主线程启动组件后连续向组件提交了10个整数,它们都依次被成功消费(使用控制台输出操作模拟消费过程,当然,它也可以是任何复杂的处理过程)。整个应用程序中,除了Commit()接口以外,其他接口(Start()、Stop()、SetComsumerCallback())只需要调用一次。用起来好像也不是很复杂,但总感觉还不够优雅,是哪里不对劲呢?
其实是因为样例代码中处理的问题过于简单,例子中仅仅是安排了一个面向int类型的生产者-消费者场景,消费操作也只是简单的输出来模拟。然而实际开发过程中,我们面向的数据类型是复杂的,需要实现的具体业务也是复杂的。我们来考虑以下场景:
假设我们现在需要实现"用户操作记录保存到MySQL数据库"这样一个功能,用户操作记录包含一些重要字段(如用户名、操作时间、操作类型、操作结果等,具体哪些字段在本例中不太重要),我们需要将用户产生的关键操作以操作记录的形式,一条条保存到指定的MySQL数据库中。显然完成这个小小的功能主要由3个核心的工作:
- 在初始化阶段,应用程序需要建立与MySQL数据库的连接;
- 当用户发生关键操作时,需要将操作记录数据缓存起来,并在单独的线程中构建MySQL命令以完成数据保存操作;
- 业务终止时(如应用程序退出等),需要关闭与MySQL的连接。
在上面这种场景中,显然要运用生产者-消费者模型,通过一个MySQL长连接不断地执行SQL语句来实现这个功能。
然而我们无法使用当前这种设计的Consumer来完成目标功能,因为当前的设计中,作为Consumer的使用者仅仅有权指定消费操作如何实现,也就是说我们最多可以指定如何构建保存操作记录的SQL语句,但这远远不够,执行SQL语句必须通过MySQL连接对象来完成,可是这个Consumer根本没有给我们获取数据库连接的机会(我们总不能在每次需要消费时去连MySQL->执行SQL命令->关闭MySQL连接吧),除此之外,Consumer并没有在进入消费循环之前帮我们建立好MySQL连接(因为他作为通用组件,并不知道需要做这件事),当然也不会在消费线程终止时帮我们关闭MySQL连接了,这些工作Consumer不会做,也没有开放出来给我们使用者来做。
因此,要实现一个通用的消费者组件,我们还需要考虑以下两个实际问题:
- 开放资源初始化/释放处理:消费线程进入消费循环之前,可能需要针对具体的业务进行一些资源初始化工作(如创建网络连接、打开其他文件、设置flag等),消费线程退出时也需要完成这些资源释放工作(关闭socket、释放fd,清除flag等),这两部分工作可能需要可能不需要,如果需要,那么这些处理过程也是灵活的,然而当前的设计并未考虑到这些过程,更不用提将这部分处理过程的实现开放给组件的使用者这回事了。
- 允许维护额外的数据信息:即便针对问题1,新开放两个回调(StartingCallback、StoppedCallback)供组件的使用者来自定义资源初始化(Initialize)和资源释放(Uninitialize)过程处理,我们还是会面临回调函数本身带来的局限性,以本例中的消费操作回调(m_ConsumerCallback)为例,该回调函数的原型中仅仅包含了组件实例的地址和待消费的数据(的const引用)两个入参,如果完成消费操作还需要额外的信息(如需要获取其他配置信息、数据库连接、socket等),回调函数恐怕就不太好用了,毕竟这些额外的信息只能通过其他数据传递方式来获得,而不能通过传参,这是回调函数的特性所决定的;
关于问题1,我们当然可以把资源初始化、资源释放两个过程也做成回调,交给组件的使用者来实现,然而正如问题2中提到的那样,回调函数这种机制本身就存在数据传递方面的局限性。因此问题2才是根本问题,如何解决问题2呢?
首先来分析以下,一个消费者组件,如果能够把资源初始化、消费处理、资源释放这3部分的实现开放出来,交给组件的使用者来实现,这样的组件已经基本具备通用性了,换而言之,我们可以认为一个通用的消费者组件一定具有这3个行为,只是行为的具体实现是待定的(变化的,需要在未来才能具体实现的)。
分析到这一层,我们已然可以运用面向对象设计(OOD)的思想回过头来思考。可以将回调函数改为回调类(叫法有很多,业内常常称这种类为Handler或者Interface,以下统称Handler好了),也就是说,组件内部我们不再维护一个或多个回调函数(地址)了,直接保存一个Handler对象的地址(注意,是一个类对象指针),这个Handler类中约定了所有可能发生的行为(或者说需要处理的事件的接口),原来需要回调的那些函数就变成了这个Handler类的成员函数,原本需要回调的地方(如m_ConsumeCallback(this,data))都变成通过这个Handler对象来调用接口(m_handler->OnConsume(this,data))。Handler类应当长这个样子:
//消费者组件类模板的先导声明,以被Handler类识别
template<typename T>
class Consumer;
/**
* \类名 IConsumerHander<T>
* \作者 huge
* \日期 2022/06/28
* \描述 消费者组件(Consumer<T>)的Handler类
* 它定义了消费者组件可能发生的所有行为(或事件)
*/
template <class T>
class IConsumerHander
{
public:
/**
* 描述: 消费者组件刚刚启动
* 访问: virtual public
* 返回: bool 处理成功请返回true,否则返回false
* @ Consumer<T> * pSender: 指向发生此事件的消费者组件实例
* 备注:
* 此事件发生在消费者线程刚刚创建之后,进入消费循环之前,你可以在此初始化消费操作需要的相关资源,如建立网络连接、加载文件等
* 返回值将直接决定消费者线程是否进入消费循环,返回true将进入消费循环,返回false直接退出消费线程,停止组件
*/
virtual bool OnStart(Consumer<T>* pSender) { return true; }
/**
* 描述: 发生单次消费动作
* 访问: virtual public
* 返回: bool 消费成功请返回true,否则返回false
* @ Consumer<T> * pSender: 指向发生该事件的消费者组件实例
* @ const T & data: 待消费的数据
* 备注:
*/
virtual bool OnConsume(Consumer<T>* pSender, const T& data) = 0;
/**
* 描述: 消费者组件正在停止
* 访问: virtual public
* 返回: bool 操作成功请返回true,否则返回false
* @ Consumer<T> * pSender: 指向发生该事件的消费者组件实例
* 备注:
* 你可以在此释放该组件启动时初始化的相关资源(如果需要的话),如断开网络连接、关闭文件等
* 该接口的返回值暂时不对组件产生任何影响
*/
virtual bool OnStop(Consumer<T>* pSender) { return true; }
public:
virtual ~IConsumerHander() {}
};
可以看到,这个ConsumerHandler类被设计为一个抽象类模板,三个接口都是虚函数,其中OnConsume()是纯虚的,因此它不能实例化,必须通过继承它,并在它的子类中至少重写消费操作接口(OnConsume),这样的子类才能实例化。
- 之所以没有把OnStart、OnStop设计成纯虚接口,是考虑到这两部分处理是可选做的,某些场景下可能不需要进行资源的初始化和释放,因此它俩在ConsumerHandler中拥有默认的实现,而不必让继承者重写它们;
- 之所以设计为类模板,是因为所有行为处理面向的数据类型和Consumer组件是一致的,因此将数据类型作为模板参数,这与Consumer组件是统一的。
你有可能会问:为什么Handler(回调类)就能解决回调函数带来的数据传递问题呢?
是这样的,普通函数一般来说都是面向传入的参数进行处理,即便我们有办法在普通函数中访问一些入参之外的数据,那也是不方便的,也不优雅。Handler是个类,可以创建对象,相比于普通函数,它有个优点就是不仅具有行为(成员函数),还能维护信息(成员数据),我们完全可以在Handler类中保存一些回调函数入参之外的数据(如socket,cache,flag等),以帮助我们完成回调函数的功能。
有了这个ConsumerHandler类,如何让它发挥作用呢?这需要ConsumerHandler和Consumer通力合作,各自完成各自的使命:
- ConsumerHandler的使命:重写(Override)。组件的使用者必须在ConsumerHandler的子类中实现消费处理(当然他也可以酌情重写初始化(OnStart)和资源释放(OnStop)两个接口,但不是必须的),然后将ConsumerHandler的子类实例的地址传给Consumer组件。
- Consumer组件的责任:回调(Callback)。Consumer保存了一个ConsumerHandler对象指针,它将指向一个ConsumerHandler的子类实例,在适当的时机通过ConsumerHandler对象指针来调用相应的接口(组件启动时调用OnStart,组件停止时调用OnStop,需要进行消费操作时调用OnConsume),不同的ConsumerHandler的子类实例决定了消费操作的不同处理实现。
因此Consumer组件应当变成这样:
/**
* \类名 Consumer
* \作者 wangxl
* \日期 2022/06/23
* \描述 消费者组件类模板
* \备注 要求编译器支持C++11或更高语言标准
* 该组件实现了单消费者线程模型,模板参数标识消费队列中的数据类型
* [注意事项]:
* 构造时必须传入一个IConsumerHander<T>回调实例的非空地址
*/
template<typename T>
class Consumer
{
public:
/**
* 描述: 带参构造函数
* 访问: public
* 返回:
* @ IConsumerHander<T> * pHandler: 指向组件回调实例
* 备注:
* 1.若指定pHandler为空(nullptr),则组件启动不会成功;
* 2.由于回调在消费者线程中发生,故请务必保证回调处理中的代码是线程安全的
*/
Consumer(IConsumerHander<T>* pHandler) : m_pHandler(pHandler) {}
/**
* 描述: 析构函数
* 访问: virtual public
* 备注: 析构时停止组件
*/
virtual ~Consumer()
{
Stop();
}
public:
/**
* 描述: 启动组件
* 访问: public
* 返回: bool 启动成功返回true,否则返回false
*/
bool Start()
{
if (m_bStoped)
{
m_bStoped = false;
m_thConsumer = std::thread(&Consumer<T>::fnConsumer, this);
}
return true;
}
/**
* 描述: 停止组件
* 访问: public
* 返回: void
* 备注:
* 若组件已经停止,则不会做任何事;
*/
void Stop()
{
if (!m_bStoped)
{
//将m_bStoped置true后唤醒所有消费者线程,使其安全结束
{
std::unique_lock<std::mutex> lock(m_mtxData);
m_bStoped = true;
}
m_cvNewDataOrStoped.notify_all(); //唤醒所有等待在此条件上的线程,让它们优雅地退出
if (m_thConsumer.joinable())
m_thConsumer.join(); //等待消费者线程结束
}
}
/**
* 描述: 向组件提交一条数据(单次生产操作)
* 访问: public
* 返回: bool true表示提交成功,false提交失败
* @ const T & data: 提交的数据对象
* 备注: 若组件未启动,则不会提交数据,因此提交数据前请先启动组件
*/
bool Commit(const T& data)
{
if (m_bStoped)
return false;
//数据入列
{
std::unique_lock<std::mutex> lock(m_mtxData);
m_DataQue.push(data);
}
m_cvNewDataOrStoped.notify_one(); //唤醒某个消费者线程,使其消费刚刚提交的新数据
return true;
}
protected:
/**
* 描述: 消费者线程函数
* 访问: virtual protected
* 返回: void
*/
virtual void fnConsumer()
{
//线程即将进入消费循环,在此初始化资源
auto bStartFlag = m_pHandler && m_pHandler->OnStart(this);
if (bStartFlag)
{
while (true)
{
T data; //用于缓存最新取出的单条数据
{
//检查队列中是否仍有待消费的数据,或者外部停止了组件
std::unique_lock<std::mutex> lock(m_mtxData);
m_cvNewDataOrStoped.wait(lock, [this] {return m_bStoped || !m_DataQue.empty(); });
//若外部停止了组件,且队列中没有待消费的数据,可以退出线程
if (m_bStoped && m_DataQue.empty())
break;
//否则一定是队列中还有待消费数据,取出一条消费之
data = std::move(m_DataQue.front());
m_DataQue.pop();
}
//数据消费处理
auto flag = m_pHandler && m_pHandler->OnConsume(this, data);
}
}
//组件即将停止,在此释放资源
m_pHandler && m_pHandler->OnStop(this);
}
protected:
bool m_bStoped = true; //标识组件是否已经停止运行,true表示已停止,false表示已启动
std::mutex m_mtxData; //数据互斥量,保证队列操作的线程安全
std::condition_variable m_cvNewDataOrStoped;//条件变量,检查是否有新数据提交或组件被停止
std::queue<T> m_DataQue; //数据队列
std::thread m_thConsumer; //消费者线程对象
IConsumerHander<T>* m_pHandler = nullptr; //组件回调实例(的地址)
代码其实不长,只是因为注释比较详细。细心的你会发现,这不正是利用了C++多态的特性么,消费处理一定会由未来的组件使用者在ConsumerHandler的子类来实现,这一点得到了保证,不然就无法创建一个Handler(的子类实例),更无法将它传递给Consumer组件了。
如果你了解过常用的设计模式,那么你一定对策略模式(Strategy)还有一些印象(没有印象没关系,我会在此简单的梳理一下)。策略模式告诉我们:可能发生的行为或事件是确定的(稳定的),实现(或响应)它们的处理是灵活的(变化的),如果你面临的场景拥有这样的特点,那么恭喜你,这正是运用策略模式的绝好机会。巧了嘛不是,上面这种Handler的设计,就是策略模式的一种体现,让我们来对号入座:
- 可能发生的行为或事件是稳定的:拢共就3个可能的行为或事件,组件启动OnStart,组件停止OnStop,单次消费OnConsume;
- 实现(或响应)它们的处理是变化的:由不同的Handler子类来决定不同的行为实现。
这不是策略模式是什么。所以说我们还顺带着发现了一个真相:Handler其实就是Strategy。
有了这样一个通用的Consumer组件,它可以在哪些地方应用呢?我们就回头看看前面提到的假想需求-"用户操作记录保存到MySQL数据库",现在我们已经可以使用Consumer来实现它了。只需要:
- 在OnStart()中完成建立MySQL连接的工作(当然,连接对象需要被维护成类的成员);
- 在OnConsume()中通过MySQL连接对象执行SQL语句以完成记录的保存工作;
- 在OnStop()中断开与MySQL连接。
程序世界中充满了Callback,最后让我们也来一次Callback。看看如果通过上面的Consumer+ConsumerHandler实现我们最初想要实现的异步日志组件:
/*!
* \文件 MyLogger.h
* \作者 huge
* \日期 2022/06/28
* \描述 日志组件类源码,基于Consumer + ConsumerHandler通用组件实现了一个简单的异步日志输出的组件
*/
#ifndef MYLOGGER_H
#define MYLOGGER_H
#include <fstream>
#include "Consumer.h"
using namespace csm;
namespace logger
{
/**
* \类名 MyLogger
* \作者 huge
* \日期 2022/06/28
* \描述 基于Consumer + ConsumerHandler通用组件实现了一个简单的异步日志输出的组件
*/
class MyLogger : public csm::IConsumerHander<std::string>
{
public:
MyLogger() : m_LogConsumer(this) {} //构造是将this指针作为Handler传入消费者组件
public:
/**
* 描述: 启动日志组件
* 返回: bool 启动成功返回true,否则返回false
* @ const char * lpszLogFilePath: 日志保存的文件路径
*/
bool Start(const char* lpszLogFilePath)
{
if (!lpszLogFilePath)
return false;
m_strLogFilePath = lpszLogFilePath; //保存日志文件路径
return !m_strLogFilePath.empty() && m_LogConsumer.Start(); //启动消费者组件
}
/**
* 描述: 停止日志组件
*/
void Stop()
{
m_LogConsumer.Stop(); //停止消费者组件
}
/**
* 描述: 提交一条日志消息到队列中
* 返回: bool 提交成功返回true,否则返回false
* @ const char * lpszLogMsg: 待提交的日志消息字符串
*/
bool CommitLogMsg(const char* lpszLogMsg)
{
return lpszLogMsg && m_LogConsumer.Commit(lpszLogMsg); //向消费者组件提交一条日志消息
}
// 通过 IConsumerHander<std::string> 继承
public:
/**
* 描述: [日志组件刚刚启动]事件处理
* 备注: 我们只需要在此打开目标日志文件即可
*/
virtual bool OnStart(Consumer<std::string>* pSender) override
{
m_ofs.open(m_strLogFilePath, std::ios::out | std::ios::app);//以追加写方式打开目标文件
return m_ofs.is_open();
}
/**
* 描述: [新的日志消息到来需要保存]事件处理
* 备注: 我们只需要在此将日志消息保存到文件即可
*/
virtual bool OnConsume(Consumer<std::string>* pSender, const std::string & data) override
{
m_ofs << data << std::endl;//保存单条日志消息到文件流
return true;
}
/**
* 描述: [组件刚刚停止]事件处理
* 备注: 我们只需要在此检查关闭日志文件即可
*/
virtual bool OnStop(Consumer<std::string>* pSender) override
{
if (m_ofs.is_open()) //关闭目标文件
m_ofs.close();
}
private:
std::ofstream m_ofs; //日志文件流对象,用于写入日志消息
std::string m_strLogFilePath; //日志文件路径
csm::Consumer<std::string> m_LogConsumer; //面向std::string的消费者组件
};
}
#endif // !MYLOGGER_H
MyLogger日志组件实际上就是一个面向std::string的消费者组件,因此它内部拥有一个Consumer<std::string>实例作为成员,有了这个成员,MyLogger自己轻松了不少,它几乎什么都不需要做,仅仅做了以下工作:
- 在组件启动时打开目标日志文件
- 发生消费时将日志消息追加到文件流
- 组件停止时关闭目标日志文件
这太轻松了,原来需要维护的日志消息队列和日志保存线程不再需要维护了,因为它们都已经被内部的Consumer组件实现了,MyLogger仅仅需要完成日志保存的具体实现,以及在适当的时机打开和关闭文件即可,MyLogger可以说被大大地解放了。
更重要的是,即便发生了这样翻天覆地的变化,对于MyLogger的使用者来说,什么也感受不到,换而言之,MyLogger的在用法上根本没有发生变化。这对MyLogger的使用者来说也算得上一件美事了。
这样的Consumer还可以实现很多类似的组件,哪怕是线程池也不在话下,我们可以想一下,线程池其实也是生产者-消费者模型,它特殊的地方在于,生产消费的数据不是普通的数据,而是包含了参数的一个待执行处理,我们可以把这种数据叫做task,一个task要执行,除了需要指定处理代码以外,还要指定处理的参数值,执行task其实就是线程池的消费操作,消费线程就是线程池的核心线程。只要我们设计一种能够保存task这种东西的数据类型,就可以把一个个task塞进队列,然后在消费线程中取出task执行,就实现了线程池的核心功能。
当然了,这样的线程池有很多缺点,比如它只有一个核心线程并且无法调整线程数、无法获取提交的task的执行结果等等,但是不可否认它确实算一个线程池。
好啦,我们终于搞出了一个具备通用性的消费者组件了,关于生产者-消费者模型的抽象思考到这里可以告一段落了。当然这并不意味着这样的Consumer + ConsumerHandler就已经完美了,还差的远呢,但是核心的思想已经被我们有一定深度的认识了,哪怕是比较浅显的认识。如果本文对读者能有些帮助,那我也不算白忙活了。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)