同时读取子进程的 stdout 和 stderr

2024-02-04

我正在尝试在 Python 中运行一个冗长的命令,输出到 stdout 和 stderr。我想轮询子进程并将输出写入单独的文件。

根据这个答案,我尝试了以下操作python 中 subprocess.PIPE 的非阻塞读取 https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python

import subprocess

from Queue import Queue, Empty
from threading import Thread

def send_cmd(cmd, shell=False):
    """
    Send cmd to the shell
    """
    if not isinstance(cmd, list): cmd = shlex.split(cmd)

    params = {'args'   : cmd,
              'stdout' : subprocess.PIPE,
              'stderr' : subprocess.PIPE,
              'shell'  : shell}

    proc = subprocess.Popen(**params)

    return proc

def monitor_command(process, stdout_log=os.devnull, stderr_log=os.devnull):
    """
    Monitor the process that is running, and log it if desired
    """
    def enqueue_output(out, queue):
        for line in iter(out.readline, b''):
            queue.put(line)

    def setup_process(log_name, proc):
        FID = open(log_name, 'w')
        queue = Queue()
        thread = Thread(target=enqueue_output, args=(proc, queue))
        thread.daemon = True # Thread dies with program
        thread.start()

        return (queue, FID)

    def check_queues(queue_list, errors):
        for queue, FID in queue_list:
            try:
                line = queue.get_nowait()
                if 'error' in line.lower() or 'failed' in line.lower():
                    errors.append(line)
            except Empty:
                pass
            else:
                FID.write(line)

    errors = []
    queue_list = []

    for log, proc in [(stdout_log, process.stdout), (stderr_log, process.stderr)]:
        queue_list.append(setup_process(log, proc)

    while process.poll() is None:
        check_queues(queue_list, errors)

    while not queue_list[0][0].empty() or queue_list[1][0].empty():
        check_queues(queue_list, errors)

    for queue, FID in queue_list:
        FID.close()

return errors

process = send_cmd('long_program.exe')
errors  = monitor_command(process, stdout_log='stdout.log', stderr_log='stderr.log')

但如果 stdout 的输出文件是空的,而 stderr 的输出文件只有几行长,而两者都应该很大。

我缺少什么?


我这样做过一次..这是我写的一些旧代码



class Process_Communicator():

    def join(self):
        self.te.join()
        self.to.join()
        self.running = False
        self.aggregator.join()
        self.ti.join()

    def enqueue_in(self):
        while self.running and self.p.stdin is not None:
            while not self.stdin_queue.empty():
                s = self.stdin_queue.get()
                self.p.stdin.write(str(s) + '\n\r')
            pass

    def enqueue_output(self):
        if not self.p.stdout or self.p.stdout.closed:
            return
        out = self.p.stdout
        for line in iter(out.readline, b''):
            self.qo.put(line)
        #    out.flush()

    def enqueue_err(self):
        if not self.p.stderr or self.p.stderr.closed:
            return
        err = self.p.stderr
        for line in iter(err.readline, b''):
            self.qe.put(line)

    def aggregate(self):
        while (self.running):
            self.update()
        self.update()

    def update(self):
        line = ""
        try:
            while self.qe.not_empty:
                line = self.qe.get_nowait()  # or q.get(timeout=.1)
                self.unbblocked_err += line
        except Queue.Empty:
            pass

        line = ""
        try:
            while self.qo.not_empty:
                line = self.qo.get_nowait()  # or q.get(timeout=.1)
                self.unbblocked_out += line
        except Queue.Empty:
            pass

        while not self.stdin_queue.empty():
                s = self.stdin_queue.get()
                self.p.stdin.write(str(s))

    def get_stdout(self, clear=True):
        ret = self.unbblocked_out
        if clear:
            self.unbblocked_out = ""
        return ret

    def has_stdout(self):
        ret = self.get_stdout(False)
        if ret == '':
            return None
        else:
            return ret

    def get_stderr(self, clear=True):
        ret = self.unbblocked_out
        if clear:
            self.unbblocked_out = ""
        return ret

    def has_stderr(self):
        ret = self.get_stdout(False)
        if ret == '':
            return None
        else:
            return ret

    def __init__(self, subp):
        '''This is a simple class that collects and aggregates the
        output from a subprocess so that you can more reliably use
        the class without having to block for subprocess.communicate.'''
        self.p = subp
        self.unbblocked_out = ""
        self.unbblocked_err = ""
        self.running = True
        self.qo = Queue.Queue()
        self.to = threading.Thread(name="out_read",
                                    target=self.enqueue_output,
                                    args=())
        self.to.daemon = True  # thread dies with the program
        self.to.start()

        self.qe = Queue.Queue()
        self.te = threading.Thread(name="err_read",
                                   target=self.enqueue_err,
                                   args=())
        self.te.daemon = True  # thread dies with the program
        self.te.start()

        self.stdin_queue = Queue.Queue()
        self.aggregator = threading.Thread(name="aggregate",
                                           target=self.aggregate,
                                           args=())
        self.aggregator.daemon = True  # thread dies with the program
        self.aggregator.start()
        pass

您可能不需要整个示例,但请随意剪切复制并粘贴您需要的内容。展示我如何进行线程处理也很重要。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

同时读取子进程的 stdout 和 stderr 的相关文章

  • 当我有自定义身份验证模型时,如何登录 Django Rest 可浏览 API?

    我有一个自定义用户模型 如下所示account models py from django contrib auth modles import AbstractUser from django db models signals impo
  • 使用python查找txt文件中字母出现的次数

    我需要从 txt 文件中读取该字母并打印 txt 文件中出现的次数 到目前为止 我已经能够在一行中打印内容 但计数有问题 有人可以指导吗 infile open grades txt content infile read for char
  • Virtualenv 在 OS X Yosemite 上失败并出现 OSError

    我最近更新到 OSX Yosemite 现在无法使用virtualenv pip 每当我执行 virtualenv env 它抛出一个 OSError Command Users administrator ux env bin pytho
  • 使用 Django 将文件异步上传到 Amazon S3

    我使用此文件存储引擎在上传文件时将文件存储到 Amazon S3 http code welldev org django storages wiki Home http code welldev org django storages w
  • 使用python从gst管道抓取帧到opencv

    我在用着OpenCV http opencv org 和GStreamer0 10 我使用此管道通过自定义套接字通过 UDP 接收 MPEG ts 数据包sockfd由 python 提供并显示它xvimagesink 而且效果很好 以下命
  • 类型错误:需要二进制或 unicode 字符串,得到 618.0

    I ve been trying to implement this ML Linear Model into my dataset https www tensorflow org tutorials estimator linear L
  • Python Tkinter 模块不显示输出

    我正在尝试学习 Python 并尝试使用 Python 中的 GUI 并遇到了这个 Tkinter 模块 我的代码运行 但运行时窗口没有出现 我的代码如下 from Tkinter import to create a root windo
  • Python Anaconda:如何测试更新的库是否与我现有的代码兼容?

    我在 Windows 7 机器上使用 Python 2 7 Anaconda 安装进行数据分析和科学计算 当新的库发布时 例如新版本的 pandas patsy 等 您建议我如何测试新版本与现有代码的兼容性 是否可以在同一台机器上安装两个
  • Python将文本文件解析为嵌套字典

    考虑以下数据结构 HEADER1 key value key value HEADER2 key value key value HEADER3 key value HEADER4 key value key value 原始数据中没有缩进
  • pandas 相当于 np.where

    np where具有向量化 if else 的语义 类似于 Apache Spark 的when otherwise数据帧方法 我知道我可以使用np where on pandas Series but pandas通常定义自己的 API
  • 如何查找或安装适用于 Python 的主题 tkinter ttk

    过去 3 个月我一直在制作一个机器人 仅用代码就可以完美运行 现在我的下一个目标是为它制作一个 GUI 但是我发现了一些障碍 主要的一个是能够看起来不像一个 30 年前的程序 我使用的是 Windows 7 我仅使用 Python 3 3
  • Ubuntu systemd 自定义服务因 python 脚本而失败

    希望获得有关 Ubuntu 中的 systemd 守护进程服务的一些帮助 我写了一个 python 脚本来禁用 Dell XPS 上的触摸屏 这更像是一个问题 而不是一个有用的功能 该脚本可以工作 但我不想一直启动它 这就是为什么我想到编写
  • Airflow 1.9 - 无法将日志写入 s3

    我在 aws 的 kubernetes 中运行气流 1 9 我希望将日志发送到 s3 因为气流容器本身的寿命并不长 我已经阅读了描述该过程的各种线程和文档 但我仍然无法让它工作 首先是一个测试 向我证明 s3 配置和权限是有效的 这是在我们
  • 在骨架图像中查找线 OpenCV python

    我有以下图片 我想找到一些线来进行一些计算 平均长度等 我尝试使用HoughLinesP 但它找不到线 我能怎么做 这是我的代码 sk skeleton mask rows cols sk shape imgOut np zeros row
  • 可以使用哪些技术来衡量 pandas/numpy 解决方案的性能

    Question 如何简洁全面地衡量下面各个功能的性能 Example 考虑数据框df df pd DataFrame Group list QLCKPXNLNTIXAWYMWACA Value 29 52 71 51 45 76 68 6
  • minizinc python 安装

    我通过 anaconda 提示符在 python 上安装了 minizinc 就像其他软件包一样 pip install minizinc 该软件包表示已成功安装 我可以导入该模块 但是 我正在遵循基本示例https minizinc py
  • 替换文件中的字符串

    我正在寻找一种方法来替换文件中的字符串而不将整个文件读入内存 通常我会使用 Reader 和 Writer 即如下所示 public static void replace String oldstring String newstring
  • 如何获取pandas中groupby对象中的组数?

    我想知道有多少个独特的组需要执行计算 给定一个名为 groupby 的对象dfgroup 我们如何找到组的数量 简单 快速 Pandaic ngroups 较新版本的 groupby API pandas gt 0 23 提供了此 未记录的
  • 将 Keras 集成到 SKLearn 管道?

    我有一个 sklearn 管道 对异构数据类型 布尔 分类 数字 文本 执行特征工程 并想尝试使用神经网络作为我的学习算法来拟合模型 我遇到了输入数据形状的一些问题 我想知道我想做的事情是否可能 或者我是否应该尝试不同的方法 我尝试了几种不
  • 如何(安全)将 Python 对象发送到我的 Flask API?

    我目前正在尝试构建一个 Flask Web API 它能够在 POST 请求中接收 python 对象 我使用 Python 3 7 1 创建请求 使用 Python 2 7 运行 API 该 API 设置为在我的本地计算机上运行 我试图发

随机推荐

  • 使用 `` 或 `@import` 包含 CSS - 哪个更好?

    我有一个网站 并且有多个用于打印 电视 屏幕 手持设备等的 css 样式表 我想知道这些方法中哪一种更好用 性能 可用 性等 or
  • 正则表达式问题组名称重新定义?

    所以我有这个正则表达式 s P
  • 线串长度(以英里为单位)

    我将运行数据表示为 Shapely LineStrings 其中 LineString 中的每个点都是一个坐标 我试图计算出以英里为单位的 LineString 长度 我知道 LineString 有一个length方法 但我不知道结果是什
  • 从 Spring Boot jar 文件运行非主类

    我有一个 spring boot jar 文件 里面有一个清单文件 如下所示 Manifest Version 1 0 Implementation Title myApp Implementation Version 0 1 Built
  • delphi中如何分割字符串

    我只需要分割一个字符串 例如 STANS Payment chk 1 1210 000进入一个基于数组 字符串列表中的结果将是 STANS Payment chk 1 1210 000 创建一个TStringList并将逗号分隔的字符串分配
  • 从订单示例构建订单簿[关闭]

    Closed 这个问题需要细节或清晰度 help closed questions 目前不接受答案 我正在寻找从订单构造订单簿的代码 例如 如果订单是 side price quantity buy 100 1 buy 101 10 buy
  • gcc中有128位整数吗?

    我想要一个 128 位整数 因为我想存储两个 64 位数字相乘的结果 gcc 4 4及以上版本有这样的东西吗 对于 C23 之前的 GCC 原始 128 位整数类型是仅在 64 位目标上可用 因此即使您已经检测到最新的 GCC 版本 您也需
  • 在事件处理程序中调用自定义挂钩

    我有一个名为的自定义钩子useFetchMyApi将 fetch 调用包装到 API 端点 函数钩子接受一个参数 并将其包含在帖子正文中 数据数组输出取决于钩子参数 在UI上 App组件调用useFetchMyApi一次 按钮单击处理程序将
  • 包含相同对象列表的对象的实体框架映射

    目前在我的代码中我正在做这样的事情 public class Subject private List
  • FLOPS 什么是真正的 FLOPS

    我来自这个线程 FLOPS Intel 核心并使用 C 语言对其进行测试 内积 https stackoverflow com questions 1536867 flops intel core and testing it with c
  • 将多个 div 与父级底部对齐

    我想将父级底部的 3 个 div 与100 height I tried parent height 100 display table cell vertical align bottom 但不起作用 即使您更改窗口的大小或分辨率 它也应
  • Android Studio Canary 2020.3.1:Kotlin 未解析的引用

    将 Android Studio Canary 版本更新到 3 1 后 我开始收到属于 kotlin 标准库的函数的 Kotlin 未解析引用 并且该问题似乎也影响了 Android Studio 导入正确库的能力 我相信我的问题类似于th
  • 为什么 null 不 in(1,2,3) false [重复]

    这个问题在这里已经有答案了 是否期望当我测试空值时not在列表中 结果始终为 false 那是 select Hello world where null not in 1 2 3 不要选择任何内容 因为 null not in 1 2 3
  • pycrypto 和 Google 应用引擎

    如何将 pycrypto 与 GAP 结合使用 It says here https developers google com appengine docs python tools libraries它不支持最新版本 这是否意味着我必须
  • 如何将值从子功能组件传递到父类组件?

    我有一个父类组件和一个子功能组件 我们如何将值从这种类型的子组件传递到父组件 我见过一些将值从子类组件传递到父类组件的示例 父组件 import React from react import ChildComponent from Chi
  • Heroku 上的 Resque 后台作业

    我在 Heroku 上遇到了一个非常奇怪的问题 我已经花了一段时间来解决这个问题 我的应用程序有一些外部 API 调用和邮件程序 我已将它们设置为在后台运行 ActiveJob 在 Heroku 上 我设置了两个工作人员 并且我正在使用 R
  • 如何在 Eclipse 中格式化 html 文件?

    XML 格式工作得很好 但 html 格式却不行 事实上 如果我对 html 文件使用 cmd shift F 它几乎会左对齐所有内容 我附上了之前和之后的照片 有谁知道如何解决这一问题 我尝试了 HTML 格式首选项 但没有成功 请注意
  • Maxima 中 Maple“unapply”或 Mathematica“Function”的模拟

    在 Wolfram Mathematica 中 我们可以定义作用于函数 即返回函数的函数 的运算符 例如至于下面示例中作用于两个参数的函数的第一个参数的乘法运算符 X f Function x y x f x y 然后我们可以将此运算符应用
  • 给定一个CGPath,如何让它弯曲?

    在下面的屏幕截图中 当您拖动单词气球的尾部 从气球连接到人的嘴的东西 时 形状会弯曲 如图中两个气球尾部之间的差异所示 我想知道 这是怎么做到的 我假设您需要从 CGPath 开始并对它做一些事情 有人知道这是什么吗 更新 所以如果我想弯曲
  • 同时读取子进程的 stdout 和 stderr

    我正在尝试在 Python 中运行一个冗长的命令 输出到 stdout 和 stderr 我想轮询子进程并将输出写入单独的文件 根据这个答案 我尝试了以下操作python 中 subprocess PIPE 的非阻塞读取 https sta