建模杂谈系列215 ADBS Update V3

2023-11-13

说明

本来在完成这次量化实验之前不想再改版,但想想至少还有3个ADBS要在本次中完成,算下来改所花的时间还是少于因为不改而做的额外配置,所以还是改。

本次升级要解决几个问题:

  • 1 Redis Var的配置化管理
  • 2 Worker 的命令行配置(分发与非分发)

内容

1 Redis Var

目前看来,Redis Var有这么几种角色/功能:

  • 1 通用依赖。例如像时间轴这种的变量,每个worker都要使用。
  • 2 项目定制变量。每个项目都有自己流程的缓存变量,目前有一些项目的缓存还是不正确的,但是只是用于监控,也就放过了。

所以本次的改变是将所有的redis变量都集中在config文件里,有一些只和项目相关,使用上又是固定的变量,就直接用项目名称去格式化就好。

所以会涉及到所有的app和monitor,逐个扫过一遍,确保通用的redis_var会被集中管理。

2 Worker

Worker的分发模式(fetch)有助于各Worker进行并行执行,而顺序取(range)则可以保证每一条数据不会漏掉。

这两种模式区别非常明显,又都会用到,所以把这个参数通过执行命令的参数透到外面的命令行会比较灵活。

具体的执行Worker是逻辑性问题,不需要再修改。

3 操作

  • 1 先创建一个新的文件夹,用于存放新的模板文件
  • 2 以docker run -it 方式打开老的镜像

3.1 configs_base.py

先把最新的配置内容粘贴过来,然后逐个查看每个app中需要使用的redis变量。

# 通用Redis变量 - 默认都是本地redis
global_redis_var = {}
global_redis_var['redis_agent_host'] = redis_agent_host
global_redis_var['redis_connection_hash'] = redis_connection_hash
global_redis_var['some_var'] = None

# 项目用的appRedis变量 - 默认都是本地redis
app_redis_var = {}
app_redis_var['redis_agent_host'] = redis_agent_host
app_redis_var['redis_connection_hash'] = redis_connection_hash
app_redis_var['app01_PullToStep1MongoIn'] = 'BUFF.%s.step1_mongo_in.pf.app.app01_PullToStep1MongoIn.af.gp.0.uf.flow_count' % project_name
app_redis_var['app03_PullToStep1MongoOut'] = 'BUFF.%s.step1_mongo_in.pf.app.app01_PullToStep1MongoIn.af.gp.0.uf.flow_count' % project_name

# 监控用的Redis变量
monitor_redis_var = {}
monitor_redis_var['redis_agent_host'] = redis_agent_host
monitor_redis_var['redis_connection_hash'] = redis_connection_hash
monitor_redis_var['monitor03_Step1TasksClaimed'] ='BUFF.%s.step1_mongo_in.pf.app.monitor03_Step1TasksClaimed.af.gp.0.uf.last_stat' % project_name
monitor_redis_var['monitor04_StreamIn_count'] = 'BUFF.%s.step1_mongo_in.pf.app.app01_PullToStep1MongoIn.af.gp.0.uf.flow_count' % project_name

# 数值型转换(如果声明的变量缺少流程就会中断)
app01_toDoubleVarList = []
app03_toDoubleVarList = []

然后是app需要使用的redis变量,这些应该都是项目相关的固定变量。目前看来,app01是需要redis变量的。

3.2 app01_PullToStep1MongoIn.py

先修改这个。需要使用redis var来记录入系统流量,同时也需要维持数值化变量的任务。如果可以全部是数值的话就不用管。

..
from configs_base import global_redis_var,app_redis_var

..
toDoubleVarList = app01_config['toDoubleVarList']

..

import time 
redis_var =app_redis_var['app01_PullToStep1MongoIn']
redis_buff = req.post(redis_agent_host + 'getv/',json ={'k':redis_var}).json()['data']

3.3 app03_PullToStep1MongoOut.py

app03并不需要提供其他的统计,无需redis变量,但是要增加对于数值型变量的转换功能。

..
from configs_base import global_redis_var,app_redis_var

..
toDoubleVarList = app03_config['toDoubleVarList']

3.4 monitor01_StreamFlow.py

这个monitor主要是看项目默认的输入输出队列,不必改。

3.5 monitor02_DB_Recs.py

这个是看项目相关数据库的总条数,也不用改。

3.6 monitor03_Step1TasksClaimed.py

这个是监控入系统数据被分配的情况。为了减少Mongo的统计压力,采用了增量统计的方式,使用了一个redis变量记录上次的统计点。

这个也属于“项目间差异命名,项目内固定命名”的变量,所以也提出来,放在configs_base里定义,在这个程序体中直接引入就可以。

...

from configs_base import global_redis_var,monitor_redis_var

...

redis_var = monitor_redis_var['monitor03_Step1TasksClaimed']

3.7 monitor04_StreamIn_count.py

这是用于统计入系统流量的,同样需要修改redis变量。monitor04在上个版本已经改过,现在把redis_var的导入集中管理。

3.8 worker01_Go.py

之前会给这个worker01进行不同的命名,发现在多数情况下要定制的部分其实不多,所以可以做一个约定,将这个步骤的调度都命名为 worker01_Go.py,而对应的Worker,不管实现什么功能,都将对应的输出命名为: worker_af, worker_Chain_session_list 以及worker_Chain_session_dict。Worker的名称叫TheWorker.py

启动时,以argparse的方式对其进行调度,默认参数可以维持现状(这样就不必动sche.py)。

argparse的示例如下:

import argparse
# =================  解析参数
def get_arg():
    parser = argparse.ArgumentParser(description='Customized Arguments')
#     parser.add_argument('-p','--pkl', default='Meta')

    parser.add_argument('--is_fetch_mode')

    # 准备解析参数
    args = parser.parse_args()

    is_fetch_mode = args.is_fetch_mode

    return is_fetch_mode

if __name__ =='__main__':
    is_fetch_mode = get_arg() or False
    if is_fetch_mode is not False:
        is_fetch_mode = True 
    print('is_fetch_mode :', is_fetch_mode)

调用:

root@3b2281835e89:/workspace# python3 test.py
is_fetch_mode : False
root@3b2281835e89:/workspace# python3 test.py --is_fetch_mode=xx
is_fetch_mode : True

重写worker.py

import os 

print('>>>work is Running  ')
runcode =''
for some_app in ['worker01_Go.py']:
    runcode += str(os.system('python3 %s' % some_app))
print('>>>work RunCode :%s ' % runcode)

新建worker01_Go.py

...
import argparse
# =================  解析参数
def get_arg():
    parser = argparse.ArgumentParser(description='Customized Arguments')
#     parser.add_argument('-p','--pkl', default='Meta')

    parser.add_argument('--is_fetch_mode')
    parser.add_argument('--worker_name')
    parser.add_argument('--group_name')

    # 准备解析参数
    args = parser.parse_args()

    is_fetch_mode = args.is_fetch_mode
    worker_name = args.worker_name
    group_name = args.group_name

    return is_fetch_mode,worker_name,group_name


is_fetch_mode,worker_name,group_name = get_arg()

# 参数映射1
if is_fetch_mode is not None:
    is_fetch_mode = True 
else:
    is_fetch_mode = False
# 参数映射2
worker_name =worker_name or 'alice'

# 参数映射3
# 在init_projects.py定义
group_name =group_name or 'group1'

...
from TheWorker import worker_af, worker_Chain_session_list, worker_Chain_session_dict

这样worker01就可以固定下来不变了,以后每个项目中嵌入自己的worker就可以,worker中对应的实例、链和字典赋给要导入的变量。

如果worker没有定义会怎么样?

整个流是不会有影响的,使用worker是以os方式调用worker01的,如果worker01失败,主程序并不会出错。

3.9 CNT_worker.py

这类调度主要是在冷启动的时候加速处理,启动一定期限的不间断循环,工作结束时对应的容器也会自动销毁。

import os 

import argparse
# =================  解析参数
def get_arg():
    parser = argparse.ArgumentParser(description='Customized Arguments')
#     parser.add_argument('-p','--pkl', default='Meta')

    parser.add_argument('--cnt_limit')
    parser.add_argument('--is_fetch_mode')
    parser.add_argument('--worker_name')
    parser.add_argument('--group_name')

    # 准备解析参数
    args = parser.parse_args()

    cnt_limit = args.cnt_limit
    is_fetch_mode = args.is_fetch_mode
    worker_name = args.worker_name
    group_name = args.group_name

    return cnt_limit,is_fetch_mode,worker_name,group_name


cnt_limit,is_fetch_mode,worker_name,group_name = get_arg()

cnt_limit = cnt_limit or 10000
cnt_limit = int(cnt_limit)

is_fetch_mode = is_fetch_mode or 'yes'
if is_fetch_mode == 'no':
    is_fetch_mode = ''

worker_name = worker_name or ''
group_name = group_name or ''


aleary_run = 0

print('>>>work is Running  ')
for _ in range(cnt_limit):
    aleary_run+=1
    print('Already Run',aleary_run)
    runcode =''
    for some_app in ['worker01_Go.py --is_fetch_mode=%s --worker_name=%s --group_name=%s ' % (is_fetch_mode,worker_name,group_name) ]:
        runcode += str(os.system('python3 %s' % some_app))
    print('>>>work RunCode :%s ' % runcode)

3.10 Band_worker.py

这类调度主要是为了增加的实时数据的处理,在时间内一直循环,时间到时对应的容器也会自动销毁。Band_worker因为会常态运行,所以加一个参数,让其稍微停顿一下,一般一秒一次。

import os 

import time
def get_time_str1(ts = None,bias_hours=0):
    ts = ts or time.time()
    return time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(ts + bias_hours*3600))

import argparse
# =================  解析参数
def get_arg():
    parser = argparse.ArgumentParser(description='Customized Arguments')
#     parser.add_argument('-p','--pkl', default='Meta')

    parser.add_argument('--to_dt')
    parser.add_argument('--pace')
    parser.add_argument('--is_fetch_mode')
    parser.add_argument('--worker_name')
    parser.add_argument('--group_name')

    # 准备解析参数
    args = parser.parse_args()

    to_dt = args.to_dt
    pace = args.pace
    is_fetch_mode = args.is_fetch_mode
    worker_name = args.worker_name
    group_name = args.group_name

    return to_dt,pace,is_fetch_mode,worker_name,group_name


to_dt,pace,is_fetch_mode,worker_name,group_name = get_arg()

to_dt = to_dt or '2099-01-01 00:00:00'

pace = pace or 10000
pace = int(pace)

is_fetch_mode = is_fetch_mode or 'yes'
if is_fetch_mode == 'no':
    is_fetch_mode = ''

worker_name = worker_name or ''
group_name = group_name or ''



aleary_run = 0

print('>>>work is Running  ')
while True:
    aleary_run+=1
    print('Already Run',aleary_run)
    runcode =''
    for some_app in ['worker01_Go.py --is_fetch_mode=%s --worker_name=%s --group_name=%s ' % (is_fetch_mode,worker_name,group_name) ]:
        runcode += str(os.system('python3 %s' % some_app))
    print('>>>work RunCode :%s ' % runcode)

    if get_time_str1() >= to_dt:
        break
    if pace > 0:
        time.sleep(pace)

4 提交更改,生成新镜像

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

建模杂谈系列215 ADBS Update V3 的相关文章

  • Redis部署配置-主从复制

    目前我有两台服务器 我已经部署了基于node js Express JS的Web服务API 我正在使用 Redis 来缓存 JSON 字符串 将此设置部署到生产中的最佳选择是什么 我懂了here https stackoverflow co
  • 如何清理redis中不活跃的玩家?

    我正在制作一个使用 redis 来存储游戏状态的游戏 它可以很好地跟踪位置和玩家 但我没有一个好的方法来清理不活跃的玩家 每当玩家移动时 这是一个半慢速移动游戏 想想每秒 1 5 帧 我就会用新位置更新哈希并删除旧位置键 跟踪活跃玩家的最佳
  • Docker&Celery - 错误:Pidfile (celerybeat.pid) 已存在

    应用程序包括 姜戈 雷迪斯 芹菜 码头工人 Postgres 在将项目合并到 docker 之前 一切都运行顺利且正常 但是一旦将其移入容器 就开始出现问题 起初它开始得很好 但过了一会儿我确实收到了以下错误 celery beat 1 E
  • python 3.5 中的 json.loads 和 Redis

    我使用 json dumps 创建了一个 JSON 对象 并在 Redis 列表中将其 RPUSH ed 当使用 LRANGE redis lrange 返回 JSON 时 我收到一个二进制字符串 b si 00 ff 所以 json lo
  • redis集群不断打印日志WSA_IO_PENDING

    当我启动redis集群的所有redis服务器时 所有这些服务器不断打印类似WSA IO PENDING clusterWriteDone的日志 9956 03 Feb 18 17 25 044 WSA IO PENDING writing
  • 如何使用Spring Cache处理redis异常?

    我目前正在开发一个包含 Spring Data Redis 和 Spring Cache 的项目 在spring data redis中 我使用redis模板调用redis 我在 try catch 块中处理 redis 模板抛出的所有异常
  • 如何在节点redis客户端上设置读取超时?

    在 github 上我没有看到读取超时的选项 https github com NodeRedis node redis https github com NodeRedis node redis There s connect timeo
  • 库存管理系统的 SQL 与 NoSQL

    我正在开发一个基于 JAVA 的网络应用程序 主要目的是拥有在多个称为渠道的网站上销售的产品的库存 我们将担任所有这些渠道的管理者 我们需要的是 用于管理每个渠道的库存更新的队列 库存表 其中包含每个通道上分配的正确快照 将会话 ID 和其
  • Spring Data Redis - Lettuce连接池设置

    尝试在 spring data redis 环境中设置 Lettuce 连接池 下面是代码 Bean LettuceConnectionFactory redisConnectionFactory GenericObjectPoolConf
  • Redis INCRBY 有限制

    我想知道是否有一种方法可以通过我的应用程序的单次往返在 Redis 中执行此操作 对于给定的键K 其可能值V是范围内的任意整数 A B 基本上 它有上限和下限 When an INCRBY or DECRBY发出命令 例如INCRBY ke
  • 从redis中检索大数据集

    一台服务器上的应用程序查询另一台服务器上运行的 Redis 查询的结果数据集约为 250kzrangebyscore objects locations inf inf这在应用程序服务器上似乎需要 40 秒 当使用命令执行时redis cl
  • Caffeine Expiry 中如何设置多个过期标准?

    我正在使用 Caffeine v2 8 5 我想创建一个具有可变到期时间的缓存 基于 值的创建 更新以及 该值的最后一次访问 读取 无论先发生什么都应该触发该条目的删除 缓存将成为三层值解析的一部分 The key is present i
  • 使用Redis从有限范围内生成唯一ID

    我有一些数据库项目 除了主键之外 还需要项目所属组的唯一索引 我们来调用属性nbr 以及将项目分组在一起并定义唯一范围的属性nbr 我们会打电话group This nbr必须在 1 N 范围内 并且may从外部源导入项目时进行设置 由于所
  • 如何设置和获取Redis中存储的对象?

    我试图在 redis 中存储一个对象 当我获取该对象时 它似乎不起作用 I tried u User new u name blankman redis set test u x redis get test x name error 我想
  • Lua中按字符分割字符串

    我有像这样的字符串 ABC DEF 我需要将它们分开 字符并将两个部分分别分配给一个变量 在 Ruby 中 我会这样做 a b ABC DEF split 显然Lua没有这么简单的方法 经过一番挖掘后 我找不到一种简短的方法来实现我所追求的
  • 使用 Sentinels 升级 Redis 的最佳实践?

    我有 3 个 Redis 节点 由 3 个哨兵监视 我进行了搜索 文档似乎不清楚如何最好地升级此类配置 我目前使用的是 3 0 6 版本 我想升级到最新的 5 0 5 我对这方面的程序有几个疑问 升级两个大版本可以吗 我在我们的暂存环境中执
  • 2 个具有共享 Redis 依赖的 Helm Chart

    目前 我有 2 个 Helm Charts Chart A 和 Chart B Chart A 和 Chart B 对 Redis 实例具有相同的依赖关系 如Chart yaml file dependencies name redis v
  • Redis是如何实现高吞吐量和高性能的?

    我知道这是一个非常普遍的问题 但是 我想了解允许 Redis 或 MemCached Cassandra 等缓存 以惊人的性能极限工作的主要架构决策是什么 如何维持连接 连接是 TCP 还是 HTTP 我知道它完全是用C写的 内存是如何管理
  • 为什么Redis中没有有序的hashmap?

    Redis 数据类型 http redis io topics data types包括排序集 http redis io topics data types intro sorted sets以及其他用于键值存储的必要数据结构 但我想知道
  • Scala 使用的 Redis 客户端库建议

    我正在计划使用 Scala 中的 Redis 实例进行一些工作 并正在寻找有关使用哪些客户端库的建议 理想情况下 如果存在一个好的库 我希望有一个为 Scala 而不是 Java 设计的库 但如果现在这是更好的方法 那么仅使用 Java 客

随机推荐

  • 取消GL.iNet路由器视频的密码

    每次路由器访问192 168 8 1 8083 action stream时总是无法访问 但是先进入192 168 8 1登录以后再去刷新视频就可以出来 即使取消外网登录验证也还是没效果 最后发现广大网友的意见是重新刷固件 先去GL iNe
  • 计算机专业毕业设计演示视频(论文+系统)_kaic

    https gongkailuxiangdu oss cn beijing aliyuncs com lx jsp 20 70912jspm E6 88 BF E5 B1 8B E9 94 80 E5 94 AE E7 AE A1 E7 9
  • 树莓派命令行显示乱码及异地组网问题

    写了一千多字没保存 很生气 这一条简写 命令行显示异常 首先检查树莓派设置里的地区 时区设置 一律改为中国 随后重要原因就是字库不全问题 命令行输入 sudo apt get install ttf way zenhei 一路确定安装字体
  • 程序的链接的三种方式

    程序的链接有以下三种方式 静态链接 在程序运行之前 先将各目标模块及它们所需的库函数链接成一个完整的可执行程序 以后不再拆开 装入时动态链接 将用户源程序编译后所得到的一组目标模块 在装入内存时 釆用边装入边链接的链接方式 运行时动态链接
  • 使用matlab进行灵敏性分析(附源代码)

    调用单纯形程序 function x z flg sgma simplexfun A A1 b c m n n1 cb xx A b are the matric in Ax b c is the matrix in max z cx A1
  • ChatGPT实现代码生成

    代码生成 就代码生成而言 ChatGPT 是一款卓越的工具 它为开发者提供强大的功能 ChatGPT 可以运用其出色的自然语言处理技术 深入理解和解释开发者的需求 快速生成适合的代码片段 对于那些繁琐的任务或者重复的代码 ChatGPT 能
  • 试题 C: 刷题统计

    题目链接 点击跳转 题目描述 小明决定从下周一开始努力刷题准备蓝桥杯竞赛 他计划周一至周五每天做 a 道题目 周六和周日每天做 b 道题目 请你帮小明计算 按照计划他将在第几天实现做题数大于等于 n 题 输入格式 输入一行包含三个整数 a
  • 系统资源占用高排查手段

    1 cpu高排查思路 1 top d 1每秒打印进程所占cpu资源 然后再按h显示线程占用 2 strace跟踪strace p 线程号 会打印该线程主要做什么操作 2 io高排查思路 lsof是一个展现的是当前系统所有进程 不是线程 打开
  • 端午过后公司面了一个字节来的要求月薪23K,明显感觉他背了很多面试题...

    最近有朋友去字节面试 面试前后进行了20天左右 包含4轮电话面试 1轮笔试 1轮主管视频面试 1轮hr视频面试 据他所说 80 的人都会栽在第一轮面试 要不是他面试前做足准备 估计都坚持不完后面几轮面试 其实 第一轮的电话面试除了一些常规的
  • Redis数据结构——QuickList、SkipList、RedisObjective

    承接上文 本文主要介绍QuickList SkipList RedisObjective 四 Redis数据结构 QuickList 问题1 ZipList虽然节省内存 但申请内存必须是连续空间 如果内存占用较多 申请内存效率很低 怎么办
  • ObjectArx 学习笔记(一)--入口函数acrxEntryPoint

    参考资料 AutoCAD 2000 ARX二次开发实例精粹 1 Arx程序的初始化 新建完工程之后 Arx程序的初始化在acrxEntryPoint 函数的AcRx kInitAppMsg事件中 或该事件调用的函数中进行 例如InitApp
  • 【PS】高低频磨皮

    一 原理 将皮肤纹理的信息储存在高频的图层中 将皮肤颜色的信息储存在低频的图层中 从而分开皮肤的颜色和纹理 达到快速修复皮肤的效果 二 步骤 1 建立高低频图层 2 低频图层 3 高频图层 图像 应用图像 混合模式改为线性光
  • 以http协议实现onvif协议并完成对IPC摄像头的监控

    文章目录 目录 文章目录 前言 1实现http连接 2 获取设备编码参数 3 设置摄像头相关参数 总结 前言 因为工作上的原因 需要接入IPC摄像头 实现监控功能 因而开始了对于IPC摄像头的学习之路 因为要做到通用 所以目光直接锁定了on
  • python爬虫增加多线程获取数据

    Python爬虫应用领域广泛 并且在数据爬取领域处于霸主位置 并且拥有很多性能好的框架 像Scrapy Request BeautifuSoap urlib等框架可以实现爬行自如的功能 只要有能爬取的数据 Python爬虫均可实现 数据信息
  • 国产数据库产品清单

    01 提到国产数据库 圈儿内的朋友多数会说出国产数据库 四大家族 达梦 金仓 南大 神通 那么除了这四家 你还是否还了解其他的国产数据库产品 随着国内信息技术的快速发展 以及近几年去 O 的强势浪潮 在国内各数据库厂商的不断努力下 国产数据
  • 区块链实验室(14) - 编译FISCO-BCOS

    FISCO BCOS是一种区块链平台 与Hyperledger和Ethereum有些不同 详见FISCO BCOS 区块链 编译FISCO BCOS源码的目的是修改或者新增其中功能模块 进行对比实验 验证新想法 新创意的效果 编译的步骤很简
  • SQLi-Labs 学习笔记(Less 41-50)

    点击打开链接 Less 41 基于错误的POST型单引号字符型注入 先打开网页查看 Welcome Dhakkan 与之前讲的Less 40的区别 plain view plain copy sql SELECT FROM users WH
  • WPF性能优化经验总结

    原文地址 WPF性能优化经验总结 痴鸟 博客园 WPF性能优化一 Rendering Tier 1 根据硬件配置的不同 WPF采用不同的Rendering Tier做渲染 下列情况请特别注意 因为在这些情况下 即使是处于Rendering
  • 服务器添加网卡

    原因 因为网络原因服务器需要添加网卡 1 确定主板卡槽 是否可以添加网卡 2 命令ip a 查看现有网卡 3 命令 cd etc sysconfig network scripts 查看文件列表 enpls0 网卡对应文件ifcfg enp
  • 建模杂谈系列215 ADBS Update V3

    说明 本来在完成这次量化实验之前不想再改版 但想想至少还有3个ADBS要在本次中完成 算下来改所花的时间还是少于因为不改而做的额外配置 所以还是改 本次升级要解决几个问题 1 Redis Var的配置化管理 2 Worker 的命令行配置