前言
- 我们在python代码里时常会用shell调起其他进程执行
- 有时调起进程的不止一个,非常多,但又不希望他们一个一个跑,或全部一起跑,就需要用到线程池
- 但是有些进程之间有依赖关系,比如进程A必须完成后进程B才可以执行,还需要额外的逻辑保证顺序执行
- 某些场景下,需要调起的不仅有外部进程,还有自己写的某一段函数
- 为了发现执行时间过长的的任务,我们需要设置任务超时时间
- 需要调起的任务这么多,难免有些任务出错,为防止任务大面积报错,我们需要设置多少个任务出错后停止执行
- 任务出错一次可能是运气不好,我们需要设置重试次数,重试时间间隔
- 当一次调起中有任务失败时,可以清楚的看到日志,然后方便的重跑没执行成功的任务
思路
- 提到相互依赖的任务,一定会想到dag(有向无环图)
- 但是dag的点边关系配置复杂,每次新增节点就得为节点添加一条或多条线,这事在图上操作比较直观,但是在代码里非常捉急了
- 因此咱们重新设计dag表达方式,如下:
- 先用list定义一个3个任务的任务集,[1,2,3]
- 定义这个任务集为有序任务集:[True,1,2,3],即执行顺序1->2->3,当然,无序就是False,他们可以并行执行
- 多个任务集就形成嵌套,我们规定:嵌套中,相邻层的任务集[顺序性]相反,且这个True,False只能写在最外层,仅写一次。接下来,举几个例子你就明白了
- 先执行[True,1,2],再执行[False,3,4,5]
- [True,1,2]和[True,3,4]可以同时执行
使用
1.示例代码
from multiple_cmd import multiple_cmd
cmd_list=[True,['echo 1','echo 2'],['error',['echo 4','echo 5']],'echo 6']
multiple_cmd(max_exec=6,max_error=3,retry_times=1,retry_wait_second=1,time_out_second=9999,cmds=cmd_list).exec_task()
2.参数:
- max_exec=1: 最大并行的线程数
- time_out_second=999999: 超时错误
- max_error=1: 出错多少次时停止执行任务(值大等于>=1),正在执行的线程继续执行完,没执行的取消
- retry_times=0: 出错重试次数
- retry_wait_second=1: 重试时间间隔
- cmds=None: 任务列表,支持shell与函数,list[str|function],例:[True,‘cmd1’,[‘cmd2’,‘cmd3’],‘cmd4’],True有序,False无序,list的相邻层有序无序交替
3.等效dag:
4.执行结果
5.注意:执行shell或函数的返回值为0则成功,非0失败,执行函数时,函数体内不能使用sys.exit(status),推荐用返回值传递作业执行状态
进阶(函数任务)
1.示例代码
from utils.multiple_cmd import multiple_cmd
def func(a):
try:
if a!=3:
print(a)
else:
# 错误的退出方式,应该用 return 1
# sys.exit(1)
return 1
except Exception as e:
return 1
return 0
cmd_list=[False]
cmd_list.append([(func,{'a':1}),(func,{'a':2})])
cmd_list.append([(func,{'a':3}),(func,{'a':4})])
multiple_cmd(max_exec=1,max_error=0,retry_times=1,retry_wait_second=1,cmds=cmd_list).exec_task()
2.结果
3.注意:函数是需要自己代码控制日志打印的
源码
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from concurrent.futures import ThreadPoolExecutor, wait
# 定义临时目录,用来放执行日志,按需改
import tmp_file_path
# 定义自己的日志打印方式,按需改
import writeLog
import time
import threading
import subprocess
class tree_node(object):
'''任务树节点
cmd_exec:执行任务
tmp_file_name:日志文件名
cmd:原始命令
is_finish:任务是否完成
'''
def __init__(self, cmd_exec, tmp_file_name: str, cmd: str, is_finish=True):
self.cmd_exec = cmd_exec
self.tmp_file_name = tmp_file_name
self.cmd = cmd
self.is_finish = is_finish
# 父节点集合
self.parent_nodes = []
def parent_is_finish(self):
'''父节点是否全部完成
'''
status = 1
for node in self.parent_nodes:
status &= node.is_finish
return status
class multiple_cmd(object):
'''
'''
def __init__(self, max_exec=1, time_out_second=999999, max_error=1, retry_times=0, retry_wait_second=1, cmds=None):
'''多线程执行工具(注意:执行shell或函数的返回值为0则成功,非0失败,执行函数时,函数体内不能使用sys.exit(status),推荐用返回值传递作业执行状态)
max_exec=1: 最大并行的线程数\n
time_out_second=999999: 超时错误\n
max_error=1: 出错多少次时停止执行任务(值大等于>=1),正在执行的线程继续执行完,没执行的取消\n
retry_times=0: 出错重试次数\n
retry_wait_second=1: 重试时间间隔\n
cmds=None: 任务列表,支持shell与函数,list[str|function],例:[True,'cmd1',['cmd2','cmd3'],'cmd4'],True有序,False无序,list的相邻层有序无序交替
'''
self.__max_error = max(max_error,1)
self.__time_out_second = time_out_second
self.__retry_times = retry_times
self.__retry_wait_second = retry_wait_second
# 锁
self.__lock = threading.Lock()
# 线程池
self.__executor = ThreadPoolExecutor(max_workers=max_exec)
# 当前执行的任务数
self.__exec_num = 0
# 第一层标志位:是否有序
self.__cmds = cmds
self.__is_order = self.__cmds[0]
self.__cmds.remove(self.__is_order)
# 任务数
self.__tasks_num = 0
# 根节点
self.__root_node = tree_node('root', 'root', 'root')
# 任务列表
self.__all_task = []
# handler列表
self.__handler_list = []
# 是否取消任务
self.__is_cancel = False
# 取消的任务
self.__cancel_list = []
# 成功的任务
self.__success_list = []
# 失败的任务
self.__fail_list = []
def cmd_parser_task(self, cmds: list, order: bool, root_nodes: list):
'''递归任务列表,生成表达依赖关系的n阶树\n
cmds:任务列表\n
order:当前层执行顺序\n
root_nodes:父节点
'''
tmp_node = root_nodes
parent_nodes = []
for i in range(len(cmds)):
if isinstance(cmds[i], str) or isinstance(cmds[i], tuple):
cmds[i] = self.parse_cmd_list(cmds[i])
cmds[i].parent_nodes = tmp_node
self.__all_task.append(cmds[i])
tmp_node = [cmds[i]] if order else tmp_node
parent_nodes.extend([cmds[i]])
elif isinstance(cmds[i], list) and len(cmds[i]) > 0:
node = self.cmd_parser_task(cmds[i], 1-order, tmp_node)
tmp_node = node if order else tmp_node
parent_nodes.extend(node)
else:
raise RuntimeError('命令参数含有非str,function,list的类型')
return parent_nodes[len(parent_nodes)-1:] if order else parent_nodes
def task_parser_cmd(self, cmds: list):
'''筛选未执行完成的命令,用以重跑\n
cmds:命令列表
'''
remove_list = [[], []]
for i in range(len(cmds)):
if isinstance(cmds[i], tree_node):
if cmds[i].is_finish:
remove_list[0].append(cmds[i])
else:
cmds[i] = cmds[i].cmd
elif isinstance(cmds[i], list) and len(cmds[i]) > 0:
self.task_parser_cmd(cmds[i])
if len(cmds[i]) == 0:
remove_list[1].append(cmds[i])
# 先清除已完成任务,然后清除空列表
for remove_type in remove_list:
for remove_item in remove_type:
cmds.remove(remove_item)
def cmd_generator(self):
'''根据命令的顺序和完成状态返回可执行命令
'''
result = []
all_task = self.__all_task.copy()
for task in all_task:
if task.parent_is_finish():
result.append(task)
self.__all_task.remove(task)
return result
def run(self, node: tree_node, retry_times: int, retry_wait_second: int, index: int):
'''线程函数\n
node:命令基础信息\n
retry_times:出错重试次数\n
retry_wait_second:重试时间间隔\n
index:任务序号
'''
if self.__retry_times == retry_times:
writeLog(f'任务{index}开始执行,日志:{node.tmp_file_name},命令:{node.cmd}')
status = self.run_task(node.cmd_exec)
# 定义状态码
status_info = {
'0': '正常',
'-1': '超时',
'1': '异常'
}
# 任务重试
while status != 0:
if retry_times > 0:
writeLog(f'任务{index}{status_info[str(status)]}')
time.sleep(retry_wait_second)
writeLog(f'任务{index}第{self.__retry_times-retry_times+1}次重试')
status = self.run_task(node.cmd_exec)
retry_times -= 1
else:
break
self.__lock.acquire()
try:
if status != 0:
self.__exec_num += 1
writeLog(
f'任务{index}执行失败,进度:{self.__exec_num}/{self.__tasks_num}')
self.__fail_list.append([node.tmp_file_name, node.cmd])
# 达到最大错误数,取消未执行的任务
self.__max_error -= 1
if self.__max_error == 0:
for task in self.__handler_list:
if task[2].cancel():
self.__cancel_list.append(task)
self.__is_cancel = True
for task in self.__all_task:
self.__cancel_list.append(task)
self.__all_task.clear()
writeLog(f'达到最大错误次数', 'error')
self.__exec_num += len(self.__cancel_list)
else:
self.__exec_num += 1
writeLog(
f'任务{index}执行成功,进度:{self.__exec_num}/{self.__tasks_num}')
self.__success_list.append([node.tmp_file_name, node.cmd])
node.is_finish = True
finally:
self.__lock.release()
def exec_task(self):
'''执行任务,输出结果
'''
# 获取任务执行句柄,生成任务编号
handlers, index = [], 1
# 生成任务列表
self.cmd_parser_task(self.__cmds, self.__is_order, [self.__root_node])
self.__tasks_num = len(self.__all_task)
# 每一个任务执行完或取消时,检查有无新的可执行任务,放入线程池
while not self.__is_cancel and len(self.__all_task) > 0:
self.__lock.acquire()
try:
tasks = self.cmd_generator()
# 如果当前没有正在执行的任务,却无法调启任务,死锁,退出
if self.__max_error > 0 and len(self.__all_task) != 0 and len(tasks) == 0 and len(self.__success_list)+len(self.__fail_list) == len(self.__handler_list):
self.__cancel_list.extend(self.__all_task)
self.__all_task.clear()
writeLog('依赖关系死锁', 'error')
break
finally:
self.__lock.release()
for task in tasks:
task_handler = self.__executor.submit(
self.run, task, self.__retry_times, self.__retry_wait_second, index)
index += 1
self.__handler_list.append(
[task.tmp_file_name, task.cmd, task_handler])
handlers.append(task_handler)
wait(handlers, return_when='FIRST_COMPLETED')
# 等待所有任务执行完成
wait(handlers)
return self.print_res()
def print_res(self):
'''打印结果
'''
writeLog('任务执行完成')
writeLog(f'成功{len(self.__success_list)}个')
writeLog(f'失败{len(self.__fail_list)}个')
writeLog(f'取消{len(self.__cancel_list)}个')
writeLog(f'失败日志文件:{str(self.__fail_list)}')
writeLog(f'成功日志文件:{str(self.__success_list)}')
# 生成重跑的命令
self.task_parser_cmd(self.__cmds)
# 加入执行顺序标志位
self.__cmds = [self.__is_order]+self.__cmds
writeLog(f'需要重跑的命令:{str(self.__cmds)}')
return len(self.__fail_list)
def parse_cmd_list(self, cmd):
'''生成列表,执行命令,日志文件,原始命令,是否完成\n
cmd:shell或函数
'''
# 获取当前时间戳拼接日志文件名
time_st = str(time.time()).replace('.', '')
tmp_file_name = tmp_file_path+time_st+'.log'
cmd_exec = f'{cmd} 1>{tmp_file_name} 2>&1'
is_finish = False
result = None
if isinstance(cmd, tuple):
result = tree_node(
cmd, '无', f'({cmd[0].__name__},{str(cmd[1])})', is_finish)
else:
result = tree_node(cmd_exec, tmp_file_name, cmd, is_finish)
return result
def run_task(self, cmd_exec):
'''执行task
cmd_exec:待执行的task
'''
code = 0
if isinstance(cmd_exec, tuple):
code = cmd_exec[0](**cmd_exec[1])
else:
try:
code = subprocess.call(
cmd_exec, timeout=self.__time_out_second, shell=True)
code = 1 if code != 0 else 0
except subprocess.TimeoutExpired:
code = -1
except subprocess.CalledProcessError:
code = 1
return code