shell执行神器,赶紧来围观(python,附源码)

2023-11-09

前言

  • 我们在python代码里时常会用shell调起其他进程执行
  • 有时调起进程的不止一个,非常多,但又不希望他们一个一个跑,或全部一起跑,就需要用到线程池
  • 但是有些进程之间有依赖关系,比如进程A必须完成后进程B才可以执行,还需要额外的逻辑保证顺序执行
  • 某些场景下,需要调起的不仅有外部进程,还有自己写的某一段函数
  • 为了发现执行时间过长的的任务,我们需要设置任务超时时间
  • 需要调起的任务这么多,难免有些任务出错,为防止任务大面积报错,我们需要设置多少个任务出错后停止执行
  • 任务出错一次可能是运气不好,我们需要设置重试次数,重试时间间隔
  • 当一次调起中有任务失败时,可以清楚的看到日志,然后方便的重跑没执行成功的任务

思路

  • 提到相互依赖的任务,一定会想到dag(有向无环图)
  • 但是dag的点边关系配置复杂,每次新增节点就得为节点添加一条或多条线,这事在图上操作比较直观,但是在代码里非常捉急了
  • 因此咱们重新设计dag表达方式,如下:
    • 先用list定义一个3个任务的任务集,[1,2,3]
    • 定义这个任务集为有序任务集:[True,1,2,3],即执行顺序1->2->3,当然,无序就是False,他们可以并行执行
    • 多个任务集就形成嵌套,我们规定:嵌套中,相邻层的任务集[顺序性]相反,且这个True,False只能写在最外层,仅写一次。接下来,举几个例子你就明白了
      • 先执行[True,1,2],再执行[False,3,4,5]
        • 写法:[True,1,2,[3,4,5]]
      • [True,1,2]和[True,3,4]可以同时执行
        • 写法:[False,[1,2],[3,4]]

使用

1.示例代码


from multiple_cmd import multiple_cmd

cmd_list=[True,['echo 1','echo 2'],['error',['echo 4','echo 5']],'echo 6']

multiple_cmd(max_exec=6,max_error=3,retry_times=1,retry_wait_second=1,time_out_second=9999,cmds=cmd_list).exec_task()

2.参数:

  • max_exec=1: 最大并行的线程数
  • time_out_second=999999: 超时错误
  • max_error=1: 出错多少次时停止执行任务(值大等于>=1),正在执行的线程继续执行完,没执行的取消
  • retry_times=0: 出错重试次数
  • retry_wait_second=1: 重试时间间隔
  • cmds=None: 任务列表,支持shell与函数,list[str|function],例:[True,‘cmd1’,[‘cmd2’,‘cmd3’],‘cmd4’],True有序,False无序,list的相邻层有序无序交替

3.等效dag:
(img-OpaaJi4U-1617091445575)(../../asset/命令行批处理.jpg)]
4.执行结果
在这里插入图片描述
5.注意:执行shell或函数的返回值为0则成功,非0失败,执行函数时,函数体内不能使用sys.exit(status),推荐用返回值传递作业执行状态

进阶(函数任务)

1.示例代码


from utils.multiple_cmd import multiple_cmd

def func(a):
    try:
        if a!=3:
            print(a)
        else:
            # 错误的退出方式,应该用 return 1
            # sys.exit(1)
            return 1
    except Exception as e:
        return 1
    return 0
cmd_list=[False] 
cmd_list.append([(func,{'a':1}),(func,{'a':2})])
cmd_list.append([(func,{'a':3}),(func,{'a':4})])
multiple_cmd(max_exec=1,max_error=0,retry_times=1,retry_wait_second=1,cmds=cmd_list).exec_task()

2.结果
在这里插入图片描述
3.注意:函数是需要自己代码控制日志打印的

源码


#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from concurrent.futures import ThreadPoolExecutor, wait
# 定义临时目录,用来放执行日志,按需改
import tmp_file_path
# 定义自己的日志打印方式,按需改
import writeLog
import time
import threading
import subprocess


class tree_node(object):
    '''任务树节点
    cmd_exec:执行任务
    tmp_file_name:日志文件名
    cmd:原始命令
    is_finish:任务是否完成
    '''

    def __init__(self, cmd_exec, tmp_file_name: str, cmd: str, is_finish=True):
        self.cmd_exec = cmd_exec
        self.tmp_file_name = tmp_file_name
        self.cmd = cmd
        self.is_finish = is_finish

        # 父节点集合
        self.parent_nodes = []

    def parent_is_finish(self):
        '''父节点是否全部完成
        '''
        status = 1
        for node in self.parent_nodes:
            status &= node.is_finish
        return status


class multiple_cmd(object):
    '''
    '''

    def __init__(self, max_exec=1, time_out_second=999999, max_error=1, retry_times=0, retry_wait_second=1, cmds=None):
        '''多线程执行工具(注意:执行shell或函数的返回值为0则成功,非0失败,执行函数时,函数体内不能使用sys.exit(status),推荐用返回值传递作业执行状态)
        max_exec=1: 最大并行的线程数\n
        time_out_second=999999: 超时错误\n
        max_error=1: 出错多少次时停止执行任务(值大等于>=1),正在执行的线程继续执行完,没执行的取消\n
        retry_times=0: 出错重试次数\n
        retry_wait_second=1: 重试时间间隔\n
        cmds=None: 任务列表,支持shell与函数,list[str|function],例:[True,'cmd1',['cmd2','cmd3'],'cmd4'],True有序,False无序,list的相邻层有序无序交替
        '''

        self.__max_error = max(max_error,1)
        self.__time_out_second = time_out_second
        self.__retry_times = retry_times
        self.__retry_wait_second = retry_wait_second
        # 锁
        self.__lock = threading.Lock()
        # 线程池
        self.__executor = ThreadPoolExecutor(max_workers=max_exec)

        # 当前执行的任务数
        self.__exec_num = 0

        # 第一层标志位:是否有序
        self.__cmds = cmds
        self.__is_order = self.__cmds[0]
        self.__cmds.remove(self.__is_order)
        # 任务数
        self.__tasks_num = 0
        # 根节点
        self.__root_node = tree_node('root', 'root', 'root')
        # 任务列表
        self.__all_task = []
        # handler列表
        self.__handler_list = []
        # 是否取消任务
        self.__is_cancel = False
        # 取消的任务
        self.__cancel_list = []
        # 成功的任务
        self.__success_list = []
        # 失败的任务
        self.__fail_list = []

    def cmd_parser_task(self, cmds: list, order: bool, root_nodes: list):
        '''递归任务列表,生成表达依赖关系的n阶树\n
        cmds:任务列表\n
        order:当前层执行顺序\n
        root_nodes:父节点
        '''
        tmp_node = root_nodes
        parent_nodes = []
        for i in range(len(cmds)):
            if isinstance(cmds[i], str) or isinstance(cmds[i], tuple):
                cmds[i] = self.parse_cmd_list(cmds[i])
                cmds[i].parent_nodes = tmp_node
                self.__all_task.append(cmds[i])
                tmp_node = [cmds[i]] if order else tmp_node
                parent_nodes.extend([cmds[i]])
            elif isinstance(cmds[i], list) and len(cmds[i]) > 0:
                node = self.cmd_parser_task(cmds[i], 1-order, tmp_node)
                tmp_node = node if order else tmp_node
                parent_nodes.extend(node)
            else:
                raise RuntimeError('命令参数含有非str,function,list的类型')
        return parent_nodes[len(parent_nodes)-1:] if order else parent_nodes

    def task_parser_cmd(self, cmds: list):
        '''筛选未执行完成的命令,用以重跑\n
        cmds:命令列表
        '''
        remove_list = [[], []]
        for i in range(len(cmds)):
            if isinstance(cmds[i], tree_node):
                if cmds[i].is_finish:
                    remove_list[0].append(cmds[i])
                else:
                    cmds[i] = cmds[i].cmd
            elif isinstance(cmds[i], list) and len(cmds[i]) > 0:
                self.task_parser_cmd(cmds[i])
                if len(cmds[i]) == 0:
                    remove_list[1].append(cmds[i])
        # 先清除已完成任务,然后清除空列表
        for remove_type in remove_list:
            for remove_item in remove_type:
                cmds.remove(remove_item)

    def cmd_generator(self):
        '''根据命令的顺序和完成状态返回可执行命令
        '''
        result = []
        all_task = self.__all_task.copy()
        for task in all_task:
            if task.parent_is_finish():
                result.append(task)
                self.__all_task.remove(task)
        return result

    def run(self, node: tree_node, retry_times: int, retry_wait_second: int, index: int):
        '''线程函数\n
        node:命令基础信息\n
        retry_times:出错重试次数\n
        retry_wait_second:重试时间间隔\n
        index:任务序号
        '''
        if self.__retry_times == retry_times:
            writeLog(f'任务{index}开始执行,日志:{node.tmp_file_name},命令:{node.cmd}')
        status = self.run_task(node.cmd_exec)
        # 定义状态码
        status_info = {
            '0': '正常',
            '-1': '超时',
            '1': '异常'
        }
        # 任务重试
        while status != 0:
            if retry_times > 0:
                writeLog(f'任务{index}{status_info[str(status)]}')
                time.sleep(retry_wait_second)
                writeLog(f'任务{index}{self.__retry_times-retry_times+1}次重试')
                status = self.run_task(node.cmd_exec)
                retry_times -= 1
            else:
                break
        self.__lock.acquire()
        try:
            if status != 0:
                self.__exec_num += 1
                writeLog(
                    f'任务{index}执行失败,进度:{self.__exec_num}/{self.__tasks_num}')
                self.__fail_list.append([node.tmp_file_name, node.cmd])
                # 达到最大错误数,取消未执行的任务
                self.__max_error -= 1
                if self.__max_error == 0:
                    for task in self.__handler_list:
                        if task[2].cancel():
                            self.__cancel_list.append(task)
                    self.__is_cancel = True
                    for task in self.__all_task:
                        self.__cancel_list.append(task)
                    self.__all_task.clear()
                    writeLog(f'达到最大错误次数', 'error')
                    self.__exec_num += len(self.__cancel_list)
            else:
                self.__exec_num += 1
                writeLog(
                    f'任务{index}执行成功,进度:{self.__exec_num}/{self.__tasks_num}')
                self.__success_list.append([node.tmp_file_name, node.cmd])
                node.is_finish = True
        finally:
            self.__lock.release()

    def exec_task(self):
        '''执行任务,输出结果
        '''
        # 获取任务执行句柄,生成任务编号
        handlers, index = [], 1
        # 生成任务列表
        self.cmd_parser_task(self.__cmds, self.__is_order, [self.__root_node])
        self.__tasks_num = len(self.__all_task)
        # 每一个任务执行完或取消时,检查有无新的可执行任务,放入线程池
        while not self.__is_cancel and len(self.__all_task) > 0:
            self.__lock.acquire()
            try:
                tasks = self.cmd_generator()
                # 如果当前没有正在执行的任务,却无法调启任务,死锁,退出
                if self.__max_error > 0 and len(self.__all_task) != 0 and len(tasks) == 0 and len(self.__success_list)+len(self.__fail_list) == len(self.__handler_list):
                    self.__cancel_list.extend(self.__all_task)
                    self.__all_task.clear()
                    writeLog('依赖关系死锁', 'error')
                    break
            finally:
                self.__lock.release()
            for task in tasks:
                task_handler = self.__executor.submit(
                    self.run, task, self.__retry_times, self.__retry_wait_second, index)
                index += 1
                self.__handler_list.append(
                    [task.tmp_file_name, task.cmd, task_handler])
                handlers.append(task_handler)
            wait(handlers, return_when='FIRST_COMPLETED')
        # 等待所有任务执行完成
        wait(handlers)
        return self.print_res()

    def print_res(self):
        '''打印结果
        '''
        writeLog('任务执行完成')
        writeLog(f'成功{len(self.__success_list)}个')
        writeLog(f'失败{len(self.__fail_list)}个')
        writeLog(f'取消{len(self.__cancel_list)}个')
        writeLog(f'失败日志文件:{str(self.__fail_list)}')
        writeLog(f'成功日志文件:{str(self.__success_list)}')

        # 生成重跑的命令
        self.task_parser_cmd(self.__cmds)
        # 加入执行顺序标志位
        self.__cmds = [self.__is_order]+self.__cmds
        writeLog(f'需要重跑的命令:{str(self.__cmds)}')
        return len(self.__fail_list)

    def parse_cmd_list(self, cmd):
        '''生成列表,执行命令,日志文件,原始命令,是否完成\n
        cmd:shell或函数
        '''
        # 获取当前时间戳拼接日志文件名
        time_st = str(time.time()).replace('.', '')
        tmp_file_name = tmp_file_path+time_st+'.log'
        cmd_exec = f'{cmd} 1>{tmp_file_name} 2>&1'
        is_finish = False
        result = None
        if isinstance(cmd, tuple):
            result = tree_node(
                cmd, '无', f'({cmd[0].__name__},{str(cmd[1])})', is_finish)
        else:
            result = tree_node(cmd_exec, tmp_file_name, cmd, is_finish)
        return result

    def run_task(self, cmd_exec):
        '''执行task
        cmd_exec:待执行的task
        '''
        code = 0

        if isinstance(cmd_exec, tuple):
            code = cmd_exec[0](**cmd_exec[1])
        else:
            try:
                code = subprocess.call(
                    cmd_exec, timeout=self.__time_out_second, shell=True)
                code = 1 if code != 0 else 0
            except subprocess.TimeoutExpired:
                code = -1
            except subprocess.CalledProcessError:
                code = 1
        return code



本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

shell执行神器,赶紧来围观(python,附源码) 的相关文章

随机推荐