python stdout.read()阻塞_通过阅读python subprocess源码尝试实现非阻塞读取stdout以及非阻塞wait...

2023-05-16

http://blog.chinaunix.net/uid-23504396-id-4661783.html

执行subprocess的时候,执行不是问题

最麻烦的是获取进程执行后的回显来确认是否正确执行,还不能阻塞

还要获取进程执行后的返回状态确认进程是否正确结束,也不能阻塞

分开解决这个问题

我们先解决第一个问题,获取回显

一般获取回显,代码都是如下写法

点击(此处)折叠或打开

sub_process = subprocess.Popen(command, stdin = subprocess.PIPE,stdout = subprocess.PIPE,stderr = subprocess.PIPE, shell = True)

为了搞清楚subprocess是怎么获取子进程stdout的,我们首先看看 subprocess.PIPE是什么

进入代码里可以看见subprocess.PIPE 直接是个int -1

再看看网上一般获取subprocess回显的代码

点击(此处)折叠或打开

lines = sub_process.stdout.readline()

subprocess.PIPE是-1,为什么Popen这个类的stdout变成了什么对象,可以用readline方法呢

打印type可以知道Popen对象的stdout的类型是file,我们看看subprocess里做了什么操作。

我们看看Popen的init方法(python 2.7.8)

stdout传入_get_handles函数准换出(p2cread, p2cwrite,c2pread, c2pwrite,errread, errwrite)

点击(此处)折叠或打开

(p2cread, p2cwrite,

c2pread, c2pwrite,

errread, errwrite) = self._get_handles(stdin, stdout, stderr)

p2cread, p2cwrite,c2pread, c2pwrite,errread, errwrite  传入_execute_child中,这个函数看名字就知道是真正的执行函数

点击(此处)折叠或打开

self._execute_child(args, executable, preexec_fn, close_fds,

cwd, env, universal_newlines,

startupinfo, creationflags, shell,

p2cread, p2cwrite,

c2pread, c2pwrite,

errread, errwrite)

p2cread, p2cwrite,c2pread, c2pwrite,errread, errwrite传入执行函数后,stdout等通过fdopen函数转换问file对象

点击(此处)折叠或打开

if p2cwrite is not None:

self.stdin = os.fdopen(p2cwrite, 'wb', bufsize)

if c2pread is not None:

if universal_newlines:

self.stdout = os.fdopen(c2pread, 'rU', bufsize)

else:

self.stdout = os.fdopen(c2pread, 'rb', bufsize)

if errread is not None:

if universal_newlines:

self.stderr = os.fdopen(errread, 'rU', bufsize)

else:

self.stderr = os.fdopen(errread, 'rb', bufsize)

我们先看看_get_handles方法,部分代码如下

点击(此处)折叠或打开

def _get_handles(self, stdin, stdout, stderr):

"""Construct and return tuple with IO objects:

p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite

"""

p2cread, p2cwrite = None, None

c2pread, c2pwrite = None, None

errread, errwrite = None, None

if stdin is None:

pass

elif stdin == PIPE:

p2cread, p2cwrite = self.pipe_cloexec()

elif isinstance(stdin, int):

p2cread = stdin

else:

# Assuming file-like object

p2cread = stdin.fileno()

再跟踪进去看pipe_cloexec

点击(此处)折叠或打开

def pipe_cloexec(self):

"""Create a pipe with FDs set CLOEXEC."""

# Pipes' FDs are set CLOEXEC by default because we don't want them

# to be inherited by other subprocesses: the CLOEXEC flag is removed

# from the child is FDs by _dup2(), between fork() and exec().

# This is not atomic: we would need the pipe2() syscall for that.

r, w = os.pipe()

self._set_cloexec_flag(r)

self._set_cloexec_flag(w)

return r, w

可以知道,当stdout赋值为subprocess.PIPE(即-1)时,subprocess内部通过os.pipe()创建一个管道,并返回管道的读,写文件描述符

点击(此处)折叠或打开

os.pipe()

Create a pipe. Return a pair of file descriptors (r, w) usable for reading and writing, respectively.

_set_cloexec_flag函数暂时不用详细看了,只是通过fcntl设置下文件做控制。

所以从这里我可以看出stdout等传入subprocess.PIPE后,这个值只是作为一个判断值,判断为此值以后,内部通过os.piep()用作输入输出传送。

由于subprocess内部创建的pipe()大小不可控,所以推举做法是使用StringIO创建一个内存文件对象,并传入这个对象的fileno,参考文章

http://backend.blog.163.com/blog/static/2022941262014016710912/

现在就剩下单问题就是,这个管道如何获得子进程的输入输出的呢,这就要看_execute_child里是怎么做的了

具体说明我直接在下面源代码里注释说明,最后再做总结

点击(此处)折叠或打开

def _execute_child(self, args, executable, preexec_fn, close_fds,

cwd, env, universal_newlines,

startupinfo, creationflags, shell,

p2cread, p2cwrite,

c2pread, c2pwrite,

errread, errwrite):

"""Execute program (POSIX version)"""

if isinstance(args, types.StringTypes):

args = [args]

else:

args = list(args)

if shell:

args = ["/bin/sh", "-c"] + args

if executable:

args[0] = executable

if executable is None:

executable = args[0]

#这里又创建了一个管道,这个管道只用来获取自进程try后except出来的内容,不是获取stderr

errpipe_read, errpipe_write = self.pipe_cloexec()

try:

try:

gc_was_enabled = gc.isenabled()

#这里关闭了gc回收,防止对象被回收,这里值得学习。

gc.disable()

try:

self.pid = os.fork()

except:

if gc_was_enabled:

gc.enable()

raise

self._child_created = True

if self.pid == 0:

#如果pid为0,表示自己是子进程,执行下面代码(父进程获取到的是子进程的PID,不执行此代码)

#父子进程pipe()通信原理——利用pipe()建立起来的无名文件(无路径名)。只用该系统调用所返回的文件描述符来标识该文件.

#只有调用pipe()的进程及其子孙进程才能识别此文件描述符,才能利用该文件(管道)进行通信。当这些进程不再使用此管道时,核心收回其索引结点。

#如果Pope对象初始化的时候,stdin stdout stderr都用subprocess.PIPE的话,那么fork前会创建3个管道,并传入对应的文件描述符进来

try:

#关闭从父进程复制过来的的不需要的管道的一端

if p2cwrite is not None:

os.close(p2cwrite)

if c2pread is not None:

os.close(c2pread)

if errread is not None:

os.close(errread)

os.close(errpipe_read)

#下面都是做了一些文件描述符复制操作,反正通过下面的代码将子进程的输出传到父进程

#那些描述符复制操作基本就相当于把子进程的stdout、stdin、stderr的fd绑定的父进程传过来的文件描述符上

# When duping fds, if there arises a situation

# where one of the fds is either 0, 1 or 2, it

# is possible that it is overwritten (#12607).

if c2pwrite == 0:

c2pwrite = os.dup(c2pwrite)

if errwrite == 0 or errwrite == 1:

errwrite = os.dup(errwrite)

# Dup fds for child

def _dup2(a, b):

# dup2() removes the CLOEXEC flag but

# we must do it ourselves if dup2()

# would be a no-op (issue #10806).

if a == b:

self._set_cloexec_flag(a, False)

elif a is not None:

os.dup2(a, b)

_dup2(p2cread, 0)

_dup2(c2pwrite, 1)

_dup2(errwrite, 2)

#2.7才有的写法,2.6这样写报错,2.7大概这样写比list里找快一点,所以用了dict

#如果管道文件描述符大于2的话,关闭从主进程赋值过来的管道的一端,

closed = { None }

for fd in [p2cread, c2pwrite, errwrite]:

if fd not in closed and fd > 2:

os.close(fd)

closed.add(fd)

#这里控制关闭前面用来保存except输出的管道

if close_fds:

self._close_fds(but=errpipe_write)

#切换下执行目录防止运行出错,这里也值得学习!

if cwd is not None:

os.chdir(cwd)

if preexec_fn:

preexec_fn()

#可以看到,最终是通过execvp/execvpe来执行系统命令的

if env is None:

os.execvp(executable, args)

else:

os.execvpe(executable, args, env)

except:

exc_type, exc_value, tb = sys.exc_info()

# Save the traceback and attach it to the exception object

exc_lines = traceback.format_exception(exc_type,

exc_value,

tb)

exc_value.child_traceback = ''.join(exc_lines)

#子进程将错误信息写入接受except的管道的写端

os.write(errpipe_write, pickle.dumps(exc_value))

#这里退出子进程

os._exit(255)

#父进程启动自进程后,重新打开gc回收

if gc_was_enabled:

gc.enable()

finally:

#父关闭保存子进程except输出的管道的写端

os.close(errpipe_write)

#父进程也关闭不需要使用的管道的一端

if p2cread is not None and p2cwrite is not None:

os.close(p2cread)

if c2pwrite is not None and c2pread is not None:

os.close(c2pwrite)

if errwrite is not None and errread is not None:

os.close(errwrite)

#通过获取except输出的管道的读端获取最大1M的数据

data = _eintr_retry_call(os.read, errpipe_read, 1048576)

finally:

#父关闭保存子进程except输出的管道的读端

os.close(errpipe_read)

#如果有子进程except输出,抛出自定义错误,init函数那边会try到并做相应处理

if data != "":

try:

_eintr_retry_call(os.waitpid, self.pid, 0)

except OSError as e:

if e.errno != errno.ECHILD:

raise

child_exception = pickle.loads(data)

raise child_exception

下面我们总结下,创建Popen对象时,我们传入subprocess.PIPE。

内部通过os.pipe()创建1-3个管道

生成的子进程复制了这些管道的文件描述符,子进程内部将自己的输出绑定到这写管道上

父进程通过os.fdopen将管道的文件描述符打开为file对象

并赋值给self.stdin  self.stdout stderr

因为是file对象,我们就可以直接通过read、readline、readlines等方法获取回显的字符串了

但是由于file对象的read、readline、readlines方法都是阻塞的,那么我们可以这样。

新建立一个线程去读取,并把读出来的内容塞入一个列表,每次我们主进程都去读取这个列表的最后一列

线程中读取后写入列表的延迟 需要大于主进程读取列表最后一列的延迟,以免判断内容还没被主进程读取已经进入下一列

读取子进程回显函数

点击(此处)折叠或打开

def stdout_theard(end_mark,cur_stdout,stdout_lock,string_list):

#用户获取subprocess的stdout输出的线程,防止阻塞

#cur_stdout是一个file对象,end_mark是个随机字符串,获取到这个字符串表明结束

#先暂停0.01秒

time.sleep(0.01)

for i in range(3000):

try:

out_put = cur_stdout.readline()

if not out_put:

#添加结束标记

stdout_lock.acquire()

string_list.append(end_mark)

stdout_lock.release()

break

if out_put == end_mark:

#out put正好和end_mark相等的特殊情况

continue

#外部获取到指定内容会清理string_list列表,所以要加锁

stdout_lock.acquire()

string_list.append(out_put.rstrip().lstrip())

stdout_lock.release()

time.sleep(0.03)

except:

print 'wtffff!!!!!!tuichule !!'

break

主进程中启动线程

点击(此处)折叠或打开

stdout_list = []

stdout_lock = threading.Lock()

end_mark = 'end9c2nfxz'

cur_stdout_thread = threading.Thread(target=stdout_theard, args=(end_mark,sub_process.stdout,stdout_lock,stdout_list))

cur_stdout_thread.setDaemon('True')

cur_stdout_thread.start()

主进程中判断子进程回显内容是否正确

我的例子是的作用是  erl进程里输入command_reload_list里的所有命令,并判断并记录每个命令执行后是否有ok_str返回

点击(此处)折叠或打开

for command_reload_dict in command_reload_list:

sub_process.stdin.write(command_reload_dict['com'] + '\r\n')

#每个命令执行后通过线程修改的str list的最后一个元素来获取取回显的最后一行

#得到返回值等于ok_str的为正确,延迟0.2后退出并清理回显,否则总共等待300*0.01秒

ok_str = 'load module %s true' % command_reload_dict['mod']

for i in xrange(300):

if len(stdout_list)>0:

#获得正确的返回,退出

if stdout_list[-1] == ok_str:

#记录当前模块热更成功

command_reload_dict['res'] = 'ok'

break

if stdout_list[-1] == end_mark:

#遇到end_mark 说明读线程已经结束,说明有错,直接退出

return_value['msg'] += 'reload mod process has been exit in [%s]' % command_reload_dict['mod']

return return_value

break

time.sleep(0.01)

#清除上个reload命令产生的回显

stdout_lock.acquire()

del stdout_list[:]

stdout_lock.release()

#子进程输入退出命令

sub_process.stdin.write('q().\r\n')

#等待tmp erl 进程退出

for i in xrange(300):

if len(stdout_list)>0:

if stdout_list[-1] == end_mark:

break

time.sleep(0.01)

=======================================第二个问题的分割线=========================================

进程执行后的返回状态确认进程是否正确结束,不能阻塞

之前我有接触过这个问题的,当时还没细看subprocess源码

http://blog.chinaunix.net/uid-23504396-id-4471612.html

我现在的写法

点击(此处)折叠或打开

if stop_process.poll() is None:

try:

if stop_process.stdout:

stop_process.stdout.close()

if stop_process.stderr:

stop_process.stderr.close()

stop_process.terminate()

time.sleep(0.5)

if stop_process.poll() is None:

stop_process.kill()

time.sleep(0.2)

if stop_process.poll() is None:

print 'wtf!!!!'

else:

stop_process.wait()

else:

stop_process.wait()

except:

print 'wtf?'

上面代码我一直有个疑问,poll()之后如果有问题进程还没结束怎么办?

因为sub_process.wait()是阻塞的,所以我在poll以后直接sub_process.wait()是不是也会被卡住?

subprocess的wati到底调用了什么?

当然我也可以像获取回显那样,启一个线程,主进程通过一个可以指定次数的循环来获取wait返回。

不过这样做太绕了,所以我们直接进代码看,把wait彻底搞明白

点击(此处)折叠或打开

def poll(self):

return self._internal_poll()

点击(此处)折叠或打开

def _internal_poll(self, _deadstate=None, _waitpid=os.waitpid,

_WNOHANG=os.WNOHANG, _os_error=os.error, _ECHILD=errno.ECHILD):

"""Check if child process has terminated. Returns returncode

attribute.

This method is called by __del__, so it cannot reference anything

outside of the local scope (nor can any methods it calls).

"""

if self.returncode is None:

try:

pid, sts = _waitpid(self.pid, _WNOHANG)

if pid == self.pid:

self._handle_exitstatus(sts)

except _os_error as e:

if _deadstate is not None:

self.returncode = _deadstate

if e.errno == _ECHILD:

# This happens if SIGCLD is set to be ignored or

# waiting for child processes has otherwise been

# disabled for our process. This child is dead, we

# can not get the status.

# http://bugs.python.org/issue15756

self.returncode = 0

return self.returncode

再看看wait的代码

点击(此处)折叠或打开

def wait(self):

"""Wait for child process to terminate. Returns returncode

attribute."""

while self.returncode is None:

try:

pid, sts = _eintr_retry_call(os.waitpid, self.pid, 0)

except OSError as e:

if e.errno != errno.ECHILD:

raise

# This happens if SIGCLD is set to be ignored or waiting

# for child processes has otherwise been disabled for our

# process. This child is dead, we can not get the status.

pid = self.pid

sts = 0

# Check the pid and loop as waitpid has been known to return

# 0 even without WNOHANG in odd situations. issue14396.

if pid == self.pid:

self._handle_exitstatus(sts)

return self.returncode

看到这里就明白了,poll和wait最终调用的是os.waitpid,但是poll是非阻塞的wait是阻塞的.....

我们看看python的文档

点击(此处)折叠或打开

os.waitpid(pid, options)

The details of this function differ on Unix and Windows.

On Unix: Wait for completion of a child process given by process id pid, and return a tuple containing its process id and exit status indication (encoded as for wait()). The semantics of the call are affected by the value of the integer options, which should be 0 for normal operation.

os.WNOHANG

The option for waitpid() to return immediately if no child process status is available immediately. The function returns (0, 0) in this case.

所以,发送kill信号后,pool()后就不需要wait了

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

python stdout.read()阻塞_通过阅读python subprocess源码尝试实现非阻塞读取stdout以及非阻塞wait... 的相关文章

  • python 字典中没有值

    是否可以检查 dict 中的无值 dict a None b 12345 c None My code for k v in d items if d k None print good else print Bad 执行上面的代码片段后打
  • 将列的百分比设置为 0 (pandas)

    我有一个 pandas 数据框 我想将列的某些百分比设置为 0 假设 df 有两列 A B 1 6 2 7 3 8 4 4 5 9 我现在想将 df 的前 20 和后 20 的 B 设置为 0 A B 1 0 2 7 3 8 4 4 5 0
  • 在 Python 中使用 argparse 处理无效参数

    我在用argparse https docs python org 2 library argparse html解析命令行参数 默认情况下 在收到无效参数时 它会打印帮助消息并退出 是否可以自定义 argparse 在收到无效参数时的行为
  • 在 multiprocessing.connection.Listener.accept() 给定时间后引发 TimeOutError

    我正试图打断multiprocessing connection Listener accept 但迄今为止尚未成功 由于它不提供timeout参数 我想也许我可以使用socket setdefaulttimeout 打断它 正如帖子中所建
  • Dataproc:使用 PySpark 从 BigQuery 读取和写入数据时出现错误

    我正在尝试读取一些 BigQuery 数据 ID my project mydatabase mytable 原始名称受保护 来自用户管理的 Jupyter Notebook 实例 内部Dataproc https cloud google
  • Python 列联表

    作为我正在编写的项目的一部分 我正在生成很多很多列联表 工作流程是 获取具有连续 浮点 行的大型数据数组 并通过分箱将其转换为离散整数值 例如 结果行的值为 0 9 将两行切片为向量 X 和 Y 并生成列联表 https en wikipe
  • Python/pandas:从两个数据帧中查找匹配值并返回第三个值

    我有两个不同的数据帧 df1 df2 具有完全不同的形状 df1 64 6 df2 564 9 df1 包含一列 df1 objectdesc 其中的值 字符串 也可以在 df2 df2 objdescription 的列中找到 由于两个数
  • PyCharm 虚拟环境和 Anaconda 环境有什么区别?

    当我在 PyCharm 中创建新项目时 它会创建一个新的虚拟环境 我读到 当我执行Python脚本时 它们是使用此环境中的解释器而不是系统环境来执行的 因此 如果我需要安装一些软件包 我只能将它们安装在这个环境中 而不是在系统环境中 这很酷
  • pandas dataframe 视图与复制,我如何区分?

    有什么区别 pandas df loc col a col b and df loc col a col b 下面的链接没有提到后者 尽管它有效 两者都拉视图吗 第一个拉取视图 第二个拉取副本吗 http pandas pydata org
  • 将文件转换为 Ascii 抛出异常

    后果我之前的问题 https stackoverflow com questions 31742609 how to strip the leading unciode characters from a file 31742694 nor
  • 打开文件对象的大小

    有没有办法找到当前打开的文件对象的大小 具体来说 我正在使用 tarfile 模块来创建 tarfile 但我不希望 tarfile 超过特定大小 据我所知 tarfile 对象是类似文件的对象 所以我想通用的解决方案会起作用 ls la
  • 如何使用不同的类和导入动态地使用 Python 日志记录来更改文件句柄

    我无法执行即时日志文件句柄更改 例如 我有3节课 one py import logging class One def init self txt logging debug Hey I m the class One and I say
  • Python 中函数未定义错误

    我试图在 python 中定义一个基本函数 但当我运行一个简单的测试程序时 我总是收到以下错误 gt gt gt pyth test 1 2 Traceback most recent call last File
  • 如何将目录结构解析为字典?

    我有目录结构列表 例如 a b a b c a b c d a b c e a b c f g a b c f h a b c f i 我想将它转换成像树结构一样的字典 a b c d None e None f g None h None
  • 跳过一个端点的 Flask 日志记录?

    我有一个 Python Flask 应用程序 有一项运行状况检查经常访问一个端点 但我不想在日志中看到它 如何仅禁用一个 GET 端点的日志记录 而保留其他所有端点的日志记录 艾蒂安 贝尔萨克为我指明了正确的方向 这就是我的实现方式 fro
  • 在 Python 的内置数字类型上,repr 和 str 总是相同吗?

    Are repr and strPython 内置数字类型相同 int bool float and complex 或者是否存在 深奥的 两者可能产生不同结果的情况 SO的相关问题 例如this one https stackoverfl
  • pandas 数据框中的 count 和 countif

    我有一个 DF 如下所示 trainee course completed days overdue Ava ABC Yes 0 Bob ABC Yes 1 Charlie DEF No 10 David DEF Yes 0 Emily D
  • django:自动为现有用户创建用户配置文件

    我今天在我的项目中添加了一个新的 UserProfile 模型 class UserProfile models Model user models OneToOneField User def unicode self return u
  • Python ctypes:SetWindowsHookEx 回调函数从未被调用

    我正在尝试用 Python 编写一个程序 该程序可以识别何时显示警报框 对话框 它正在处理多个监视器 我希望它在任务栏图标闪烁 弹出错误 通知等时在辅助监视器上显示可视化效果 据我所知 检测这些事件的方法是使用消息挂钩 如下所述 http
  • 在 Pandas DataFrame 中拆分列表

    我有一个包含多列的 csv 文件 使用 pandas 我将此 csv 文件读入数据帧 并有一个日期时间索引和五六个其他列 其中一列是时间戳列表 下面带有索引的示例 CreateDate TimeStamps 4 1 11 Timestamp

随机推荐