是否可以修改下面的代码,使其从"stdout"和"stderr"打印出来:

  • 打印在终端上(实时),
  • 最后存储在输出错误变量中?

代码:

#!/usr/bin/python3
# -*- coding: utf-8 -*-

import subprocess

def run_cmd(command, cwd=None):
    p = subprocess.Popen(command, cwd=cwd, shell=False,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE)
    outs, errs = p.communicate()
    rc = p.returncode
    outs = outs.decode('utf-8')
    errs = errs.decode('utf-8')

    return (rc, (outs, errs))

感谢@unutbu,特别感谢@j-f-sebastian,最终功能:

#!/usr/bin/python3
# -*- coding: utf-8 -*-


import sys
from queue import Queue
from subprocess import PIPE, Popen
from threading import Thread


def read_output(pipe, funcs):
    for line in iter(pipe.readline, b''):
        for func in funcs:
            func(line.decode('utf-8'))
    pipe.close()


def write_output(get):
    for line in iter(get, None):
        sys.stdout.write(line)


def run_cmd(command, cwd=None, passthrough=True):
    outs, errs = None, None

    proc = Popen(
        command,
        cwd=cwd,
        shell=False,
        close_fds=True,
        stdout=PIPE,
        stderr=PIPE,
        bufsize=1
        )

    if passthrough:

        outs, errs = [], []

        q = Queue()

        stdout_thread = Thread(
            target=read_output, args=(proc.stdout, [q.put, outs.append])
            )

        stderr_thread = Thread(
            target=read_output, args=(proc.stderr, [q.put, errs.append])
            )

        writer_thread = Thread(
            target=write_output, args=(q.get,)
            )

        for t in (stdout_thread, stderr_thread, writer_thread):
            t.daemon = True
            t.start()

        proc.wait()

        for t in (stdout_thread, stderr_thread):
            t.join()

        q.put(None)

        outs = ' '.join(outs)
        errs = ' '.join(errs)

    else:

        outs, errs = proc.communicate()
        outs = '' if outs == None else outs.decode('utf-8')
        errs = '' if errs == None else errs.decode('utf-8')

    rc = proc.returncode

    return (rc, (outs, errs))

推荐答案

您可以生成线程来读取stdout和stderr管道、写入公共队列以及附加到列表.然后使用第三个线程打印队列中的项目.

import time
import Queue
import sys
import threading
import subprocess
PIPE = subprocess.PIPE


def read_output(pipe, funcs):
    for line in iter(pipe.readline, ''):
        for func in funcs:
            func(line)
            # time.sleep(1)
    pipe.close()

def write_output(get):
    for line in iter(get, None):
        sys.stdout.write(line)

process = subprocess.Popen(
    ['random_print.py'], stdout=PIPE, stderr=PIPE, close_fds=True, bufsize=1)
q = Queue.Queue()
out, err = [], []
tout = threading.Thread(
    target=read_output, args=(process.stdout, [q.put, out.append]))
terr = threading.Thread(
    target=read_output, args=(process.stderr, [q.put, err.append]))
twrite = threading.Thread(target=write_output, args=(q.get,))
for t in (tout, terr, twrite):
    t.daemon = True
    t.start()
process.wait()
for t in (tout, terr):
    t.join()
q.put(None)
print(out)
print(err)

使用第三个线程(而不是让前两个线程都直接打印到终端)的原因是为了防止两个打印语句同时发生,这有时会导致文本乱码.


上面调用random_print.py,随机打印到stdout和stderr:

import sys
import time
import random

for i in range(50):
    f = random.choice([sys.stdout,sys.stderr])
    f.write(str(i)+'\n')
    f.flush()
    time.sleep(0.1)

此解决方案借鉴了J. F. Sebastian, here的代码和 idea .


下面是一个适用于类Unix系统的替代解决方案,使用select.select:

import collections
import select
import fcntl
import os
import time
import Queue
import sys
import threading
import subprocess
PIPE = subprocess.PIPE

def make_async(fd):
    # https://stackoverflow.com/a/7730201/190597
    '''add the O_NONBLOCK flag to a file descriptor'''
    fcntl.fcntl(
        fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)

def read_async(fd):
    # https://stackoverflow.com/a/7730201/190597
    '''read some data from a file descriptor, ignoring EAGAIN errors'''
    # time.sleep(1)
    try:
        return fd.read()
    except IOError, e:
        if e.errno != errno.EAGAIN:
            raise e
        else:
            return ''

def write_output(fds, outmap):
    for fd in fds:
        line = read_async(fd)
        sys.stdout.write(line)
        outmap[fd.fileno()].append(line)

process = subprocess.Popen(
    ['random_print.py'], stdout=PIPE, stderr=PIPE, close_fds=True)

make_async(process.stdout)
make_async(process.stderr)
outmap = collections.defaultdict(list)
while True:
    rlist, wlist, xlist = select.select([process.stdout, process.stderr], [], [])
    write_output(rlist, outmap)
    if process.poll() is not None:
        write_output([process.stdout, process.stderr], outmap)
        break

fileno = {'stdout': process.stdout.fileno(),
          'stderr': process.stderr.fileno()}

print(outmap[fileno['stdout']])
print(outmap[fileno['stderr']])

这个解决方案使用了Adam Rosenfield's post, here条代码和 idea .

Python-3.x相关问答推荐

创建自定义函数时定义数据类型

使用Python请求从特定URL下载图像时出错

是否有必要使用Threads()中的args显式地将共享变量传递给Python中的线程函数或直接访问它?

为什么在Python中使用RANDINT函数时会出现此TypeError?

可以在 Python 的上下文管理器中调用 sys.exit() 吗?

在 Python 中比较和排序列之间的值(带有不匹配列)

添加任意数量的 pandas 数据框

匹配语句NaN

它们是否同样存储在python3的内存中?

按字母顺序排序列表 (OrderFilter),条件是值为 '' 的条目位于列表 DRF 的末尾

Python Regex 查找给定字符串是否遵循交替元音、辅音或辅音、元音的连续模式

pip 找不到最新的软件包版本

Dask 多阶段资源设置导致 Failed to Serialize 错误

获取以特定字母开头的姓氏

Python 3 变量名中接受哪些 Unicode 符号?

Python中调用者函数的访问变量

aiohttp+sqlalchemy:在回滚无效事务之前无法重新连接

三个参数的reduce函数

为什么某些代码在 Python2 中是确定性的,而在 Python 3 中是非确定性的?

字典理解中的操作顺序