时序数据的内存服务

2023-11-06

说明

既要坚定锻炼成熟架构的道路,也要在合理的范围内重塑设计

计算时序数据的特征,少不了“Rolling”类的操作。过去,直接采用pandas进行rolling,效率很不错,但是在实战应用时不太行。

反思下来:离线的操作拓展困难,很多都是一次性的

后来开发了ADBS,通过Mongo和Redis,在数据的持久化和吞吐上是没问题的。但是,对于全量的历史回滚计算遇到了问题。

ADBS的架构将问题简化到了一个时隙,大大简化了逻辑,开发完成也即生产完成,这点很好;但是在大量的全量计算中,Mongo库还是碰到了“大频次”访问的问题。目前我关注的时隙大约是60万,也就是说各Worker会频繁的发出数据查询请求。最终优化下来,处理的时间大约要1.5天。实时计算方面倒没什么问题,(每分钟计算300时隙 – 这不就是 TPS = 5吗?)

之前纠结的点在于使用硬件去堆高能力,而不去做设计上优化。但今天转念一想,不能轴啊,毕竟可以用很小的代价来实现的。

内容

1 问题本质

可使用内存服务的点在于:

  • 1 数据不算很大,60万行数据,甚至占不了几百兆(内存装得下)
  • 2 查询的条件很简单,就是其实和终止时隙(不需要额外的算法开销)
  • 3 数据重复被取的次数会非常多(1个时隙可能会重复取数千次)
  • 4 应用面广(假设会经常的增改新的指标)
  • 5 几乎没有修改成本(只要在worker的取数环节改为向这个服务取数即可),可能需要给worker一个参数让它切换

TPS = 5 ,那么一天计算约40万次计算;因为我把时隙定在分钟,所以应该是TPM=300。

从吞吐的指标上看,至少要能达到TPS = 100或者 TPM = 6000这个水平。这样处理速度快20倍,不到1个小时就能跑完全量。

2 方案

构造一个服务,服务的主要目的是将这种滚动式的数据查询从数据库中挪到内存中

服务的构造上,在这个场景用Tornado是最合适的,不过我不想花时间去搞;还是要用Flask搭建,未来有时间了,还是应该把Tornado的模板搞一搞,又轻又快。

服务能够提速的原因在于:

  • 1 读写只在内存,比硬盘能快几十倍
  • 2 不需要数据库执行通用的条件筛选(可能从百毫秒级别,降到毫秒或者微秒级)

顺带的能稍微节约一下硬盘的耐久度消耗(虽然有时候我也暗搓搓的希望硬盘用坏了可以买新的,哈哈)。

服务最大的开销可能就是将结果进行序列化,然后在网络(局网)中传输了。

其实如果是非滚动的计算,当前的架构是完全可以满足的,所以新的服务应该只是这个特殊场景的补充,用最小化的功能来进行设计。

  • 之所以不使用redis,是因为还要进行筛选(不仅仅是kv),没那么方便

2.1 取数接口

按 random_seed ,从某个库拖取区间数据

接口的入参需要指定:

  • 1 数据库服务名称:服务会自动进行合适的连接,然后将表中的所有数据下载

接口服务在拉取数据后存入服务的内存变量(字典):

  • 1 键值即为数据库服务名称
  • 2 返回数据的条数
  • 3 返回数据对象(df)所占的内存空间

在不使用时,通过重启容器来释放内存

2.2 返数接口

和原来查表同样的入参

一般来说,通过一组变量作为联合主键来进行数据的筛选,然后通过时隙的比较来进行取数,之后将结果作为listofdict返回。

3 实施一

采用了Tornado做网络服务,的确是比Flask强很多。

单核情况下,T的处理大约是120ms~150ms, F的处理 一般在200ms。所以T的TPS是7,而F的TPS是5。

在概念上,我有两个认知盲点或者错误:

  • 1 这个取数不是IO密集,是CPU密集操作
  • 2 进程间数据不共享

取数的时候,如果直接从Mongo取,那么需要数据库进行查询,这个会耗用CPU;当采用内存处理时,从python的对象转为json传输,也要耗用CPU。可能后者的耗用还会大一些。

因为现在都是在使用MA进行数据操作,MA为了标准化操作,也会进行转换,所以我觉得效率都消耗在这上面。

速度的确提高了很多,本以为这就是这个故事的结果,没想到又反转了。

实施一的问题

tornado在多核运行时出现了内存暴增的问题。我估计在多核状态下,可能还会有很多进程会被随机创建,然后每个进程会去获取静态数据(是的,肯定不是十个核拿十次,而是有多少进程多少次)。所以多核的时候,内存方案会有问题。我想以前碰到的一些(其他人)的工程上内存爆掉的问题,可能有一部分是类似这样的。python因为天生是单进程的,所以不太会体会到。

我觉得,如果要多核操作,一定还是要面向数据库的,还是转向redis找解决方案

4 实施二

擦,突然发现其实我已经有了答案,灯下黑啊。

我现在往Redis的Stream不就是存取数据,然后分发执行吗?效率很高啊。只是有一个地方需要改变,就是消息的id用我的时隙号替代就可以了…

甚至再往前一步,所有的实时数据可以一直往这个队列加啊,之前都是什么鬼方案,哈哈。

在python里使用redis操作,原来我就是没有使用id这个关键字。
在这里插入图片描述

所以只要在现有的redis agent里加一个接口就行(10分钟都不用),完全不影响其他的操作。这样,就把这个问题重新简化为了IO问题。之前也看过,这60多万的内存占用应该也就200多M。

要点

  • 1 在RedisAgent中添加一个接口,可以方便的生成一个队列(主要是为了方便)
  • 2 在Worker中不通过Agent,直接访问Redis数据库

将mongo中的数据都搬到redis(60多万条数据,耗时7分钟左右)


100%|██████████| 115/115 [06:49<00:00,  3.57s/it]

from configs_base import redis_agent_host,project_name, worker01_config,cur_w

tier1 = 'MyQuantBaseStep1Signals'
tier2 = 'step1_mongo_in'


min_slot = cur_w.minmax(tier1 = tier1, tier2 = tier2, minmax='min', attrname='data_slot')['data']
max_slot = cur_w.minmax(tier1 = tier1, tier2 = tier2, minmax='max', attrname='data_slot')['data']
print(min_slot)
print(max_slot)

slot_tuple_list = slice_list_by_batch1(int(min_slot),int(max_slot)+1,50000)

redis_q_name = 'BUFF.%s.%s' %(tier1, tier2)
import tqdm 
for tem_tuple in tqdm.tqdm(slot_tuple_list):
    min_time_slot, max_time_slot = tem_tuple
    recs = cur_w.query_recs(tier1 = tier1, tier2 = tier2,
        filter_dict = {'$and':[{'market':market,
            'code':code, 'data_slot':{'$gte':min_time_slot ,'$lt':max_time_slot}}]},
            limits= (max_time_slot - min_time_slot) * 10,silent=True
            )['data']
    recs_df = pd.DataFrame(recs)
    recs_df['_data_source_ranking'] = recs_df['data_source'].map(data_source_ranking).fillna(999)
    recs_df1 = recs_df.sort_values(['_data_source_ranking']).drop_duplicates(['market','code','data_slot'])


    tem_msg_id_list
    keep_cols = [x for x in recs_df1.columns if not x.startswith('_')]
    recs_df1 = recs_df1.sort_values(['data_slot'])
    tem_msg_dict_list = recs_df1[keep_cols].to_dict(orient='records')
    tem_msg_id_list = list(recs_df1['data_slot'].apply(int))

    req.post(redis_agent_host + 'batch_add_msg_with_id/',
            json={'stream_name':redis_q_name,
                    'msg_dict_list':tem_msg_dict_list, 
                    'msg_id_list': tem_msg_id_list,
                'maxlen':10000000}).json()

Worker从Redis取数

import redis
lq = redis.Redis()

def flat_kvlist2dict(some_list):
    tem_dict = {}
    for i in range(0,len(some_list),2):
        k = some_list[i]
        v = some_list[i+1]
        tem_dict[k] = v
    return tem_dict

%%timeit 
stream_name = redis_q_name
start_id = 28001000
end_id = 28023010
count = 100000
cmd = 'xrange %s %s %s count %s' %(stream_name,start_id,end_id,count)
print(cmd)

res = lq.execute_command(cmd)
res_df = pd.DataFrame(res)
msg_body_df = pd.DataFrame(res_df[1].apply(flat_kvlist2dict).tolist())

每次的取数大约20多毫秒(TPS ~ 50),看起来不再需要考虑序列化的问题(一来一去,每次全量减少120万次的json序列化工作)
在这里插入图片描述

再之后修改Worker的取数方式就好了。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

时序数据的内存服务 的相关文章

随机推荐

  • OpenCV-Python快速入门(十四):模板匹配

    OpenCV Python快速入门 十四 模板匹配 前言 前提条件 实验环境 模板匹配 cv2 matchTemplate 匹配单个结果 匹配多个结果 参考文献 前言 本文是个人快速入门OpenCV Python的电子笔记 由于水平有限 难
  • 【C语言】C语言 atoi 函数解析

    个人主页 简 料 所属专栏 C语言 个人社区 越努力越幸运社区 简 介 简料简料 简单有料 在校大学生一枚 专注C C GO的干货分享 立志成为您的好帮手 C C 学习路线 点击解锁 C语言 初阶数据结构与算法 C 高阶数据结构 Linux
  • 万劫不复之地-云原生可观测性的几大误区

    传统监控厂商正把可观测性引入万劫不复之地 可观测性是当前讨论非常多的话题 这个理念由来已久 却在最近开始流行 在20世纪60年代 该理念首次由Rudolf E Kalman在其论文中提出 论文题目是 on a general theory
  • Python中range()函数的用法

    先列几个range 函数的几个用法 函数原型 range start end scan 参数含义 start 计数从start开始 默认是从0开始 例如range 5 等价于range 0 5 end 技术到end结束 但不包括end 例如
  • 配置环境变量后,mysql依旧提示“'mysql' 不是内部或外部命令,也不是可运行的程序或批处理文件”

    在启动菜单搜索 cmd 搜索到后不要点击运行 右击选择以管理员的身份运行 之后输入mysql u 用户名 p 密码 就可以了 在此输入 cmd 找到 命令提示符 右击选择 以管理员的身份运行 之后正常操作就可以了 ps 此外如果之前操作没问
  • Docker安装使用记录

    Docker使用 Docker简介 Docker 架构 Docker安装 Docker CE 镜像源站 使用官方安装脚本自动安装 仅适用于公网环境 手动安装帮助 阿里云ECS可以通过内网安装 见注释部分内容 Ubuntu 14 04 16
  • Linux笔记:命令进阶使用相关功能

    文章目录 目的 通配符 转义符 流程控制 管道符 重定向 环境变量 总结 目的 使用linux时用户可以通过一个又一个的命令来完成各种操作 除了基础的各个命令外linux还提供了各种便利的功能来协调使用这些命令 这些功能让用户在使用linu
  • jenkins 持续集成/项目部署

    前置操作可查阅 docker 安装 jenkins https xijia blog csdn net article details 127021367 spm 1001 2014 3001 5502 win安装及 jenkins 前置配
  • C5.0决策树算法及性能提升

    C5 0算法是基于C4 5开发的新版本 它能适用于很多类型的问题 同神经网络 支持向量机等复杂算法相比 它几乎可以表现地一样优秀 并且更容易理解和部署 这里我们将用UCI机器学习网站http archive ics uci edu ml i
  • GPIO使用教程(学习笔记)

    目录 前言 一 GPIO简介 1 1GPIO框图讲解 二 需驱动的寄存器 2 1时钟 2 2总线 2 3时钟配置 2 4寄存器配置 三 编程环境搭建 3 1完整程序 四 小结 前言 本文主要讲解如何驱动GPIO外设的相应寄存器搭建GPIO的
  • MyBatis 学习笔记:Java 中的数据持久化框架

    MyBatis 学习笔记 Java 中的数据持久化框架 数据持久化是大多数应用程序的关键需求之一 在 Java 开发中 MyBatis 是一个流行的数据持久化框架 它提供了一种简单而强大的方式来管理数据库访问 本文将介绍 MyBatis 的
  • 出门旅行懂链改这些问题都好解决

    每到旅游热潮结束后 各大平台都会纷纷晒出出游 成绩单 国内很多人都是 报复式出游 各大景区都是人山人海 有些人旅游是为了修身养性 放松身心 有些人是为了观赏大自然风光 享受一场视觉盛宴 有些人是为了探索未知事物 追求一种差异化 个性化的旅游
  • 环形队列设计思路

    环形队列设计思路 一 数据结构 数据存储在一段连续的内存空间 通过写位置 读位置来控制数据的输入输出 二 数据操作 1 判断空逻辑 写位置 读位置 2 判断满逻辑 写位置 1 MAX SIZE 读位置 3 写数据逻辑 判断数据不满 在当前写
  • GPG error: http://debian.cn99.com testing Release: Unknown error executing gpgv

    mail apt get updateGet 1 http debian cn99 com testing Release gpg 189B Hit http debian cn99 com testing ReleaseErr http
  • Servlet上传文件

    一 核心方法 1 HttpServletRequest类相关方法 方法 描述 Part getPart String name 获取请求中给定name的文件 Collection
  • java爬虫,提供链接直接爬取网页代码

    其实我只想要爬到整个网页的源代码的就好 通过java的一个包jsoup 就可以直接爬取了 后面有下载源代码 含jsoup包 的链接 输入 网页链接 输出 网页源代码 代码比较简单 解析都在代码中 import org jsoup Jsoup
  • python中lower函数是什么意思_python中lower函数实现方法及用法讲解

    之前小编介绍过python中将字符串小写字符转为大写的upper函数的使用方法 upper函数 有将小写转为大写的需要 那也有将大写转为小写的情况 本文主要介绍在python中可以将字符串大写自摸转换为小写字母的lower函数 1 lowe
  • CreateThread()和_beginthread()有什么不同?

    CreateThread 和 beginthread 有什么不同 我们知道在Windows下创建一个线程的方法有两种 一种就是调用Windows API CreateThread 来创建线程 另外一种就是调用MSVC CRT的函数 begi
  • Chart控件的用法和部署

    最近 导师让我做统计图 因为以前没有接触过 所以动起手来 难免会遇到很多的问题 经过一段时间的搜集资料和亲手操作 终于实现了我需要的功能 为了方便大家以后会更好的使用Chart控件和节省大家的时间 因此和大家分享一下我做的过程 首先 需要安
  • 时序数据的内存服务

    说明 既要坚定锻炼成熟架构的道路 也要在合理的范围内重塑设计 计算时序数据的特征 少不了 Rolling 类的操作 过去 直接采用pandas进行rolling 效率很不错 但是在实战应用时不太行 反思下来 离线的操作拓展困难 很多都是一次