Python 您所在的位置:网站首页 华为电脑开机按什么键才能开机 Python

Python

#Python| 来源: 网络整理| 查看: 265

Python-subprocess执行命令并将输出劫持实现实时记录到日志 前言

在写我自己的练手项目的时候,需要写一系列Python脚本来帮助我进行运维/环境配置,我希望这些脚本能够有比较好的日志记录。

这一篇博客中,我实现了日志同时向控制台和日志中进行输出,并且二者的日志等级、日志格式不相同。

这一篇博客中,我通过自定义日志的格式描述符,实现了定长的日志等级。

解决完日志问题后,我需要解决这些脚本的另一个问题:执行命令。

我写这一系列Python脚本就是因为我不擅长写shell脚本,而且Python脚本在我的开发环境(Windows)和运行环境(Linux)都可以运行。其需要担当的很重要的一个功能就是执行一些命令,如kubectl apply、mvn test等。

我希望执行这些命令的时候,能够实时地将命令的输出,记录到日志中。

本博客,我实现了执行命令,并将命令执行的输出进行劫持,实时记录到日志中。

参考 Python 官方文档 Python 官方文档 | logging模块 Python 官方文档 | 日志 HOWTO subprocess

关于多进程协同,Python主要有三个手段:

os.system()函数; multiprocessing模块; subprocess模块;

其中multiprocessing主要用于解决计算密集型计算中,Python由于GIL(全局解释器锁)的存在,多线程无法提升性能,而需要创建多个进程来加快计算的场景。

而os.system()则是阻塞式的,而且似乎无法将其创建的子进程的输出进行劫持。

subprocess模块几乎是唯一的选择。

Popen

subprocess虽然提供了简易使用的subprocess.run()方法,但是这个方法无法做到实时输出,也就是命令执行完成之后才能一次性获取命令的输出,即使传入了stdout=subprocess.PIPE要求其创建一个管道,仍旧是阻塞式的读取。stdout也可以指定为一个流,因此我们可以将其直接重定向到本程序的标准输出,但是logger并不是一个流。

所以我们只能使用更为底层的subprocess.Popen对象来进行操作。

process = subprocess.Popen( cmd, shell=True, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, )

通过这样的参数,创建子进程运行cmd指定的命令,同时创建与子进程关联的process对象,此时process.stdout和process.stderr都是可以调用readline方法来一行行读取字符串的管道。

实现

通过subprocess.Popen()创建Popen对象后,子进程开始运行,我们可以通过管道来读取子进程的输出。因为有两个管道都需要读取,并且它们没有提供检查管道是否读取的方法,只能阻塞地尝试进行读取,所以我们需要创建读取线程来分别读取这两个管道。

实现如下:

import logging import subprocess import shlex import threading class CommandExecutionException(Exception): def __init__(self, command: str, exit_code: int) -> None: super().__init__(f"command executed fail with exit-code={exit_code}: {command}") _logger = logging.getLogger(__name__) class TextReadLineThread(threading.Thread): def __init__(self, readline, callback, *args, **kargs) -> None: super().__init__(*args, **kargs) self.readline = readline self.callback = callback def run(self): for line in iter(self.readline, ""): if len(line) == 0: break self.callback(line) def cmd_exec(command: str, ensure_success: bool=True) -> int: _logger.info("executing command: {}".format(command)) cmd = shlex.split(command) process = subprocess.Popen( cmd, shell=True, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) _logger.debug("started command") def log_warp(func): def _wrapper(line: str): return func("\t" + line.strip()) return _wrapper read_stdout = TextReadLineThread(process.stdout.readline, log_warp(_logger.info)) read_stderr = TextReadLineThread(process.stderr.readline, log_warp(_logger.warning)) read_stdout.start() read_stderr.start() read_stdout.join() _logger.debug("stdout reading finish") read_stderr.join() _logger.debug("stderr reading finish") ret = process.wait() _logger.debug("process finish") _logger.info("executed command with exit-code={}".format(ret)) if ensure_success and ret != 0: raise CommandExecutionException(command=command, exit_code=ret) return ret if __name__ == '__main__': _logger_trans = { "DEBUG": "DBG", "INFO": "INF", "WARNING": "WAR", "CRITICAL": "ERR" } _old_factory = logging.getLogRecordFactory() def factory(name, level, fn, lno, msg, args, exc_info, func=None, sinfo=None, **kwargs)->logging.LogRecord: record = _old_factory(name, level, fn, lno, msg, args, exc_info, func, sinfo, **kwargs) record.shortlevelname = _logger_trans[record.levelname] return record logging.setLogRecordFactory(factory) logging.basicConfig( level=logging.DEBUG, format='[%(asctime)s %(shortlevelname)s] %(message)s', datefmt="%Y/%m/%d %H:%M:%S" ) cmd_exec("ping localhost", ensure_success=False)

在TextReadLineThread的run方法中,我们使用了内置函数iter()的两个参数的形式,其第一个参数为一个可调用对象,第二个参数为该可调用对象可能的输出,通过iter(func, sentinel)函数产生的迭代器,每次迭代时都无参数地调用func,直到func()返回sentinel时结束迭代。

关于main中日志配置,参考这一篇博客和这一篇博客,对日志格式进行增强,使得我们能够明确看到子进程的输出被劫持了。

这里我创建了两个线程,一个用于读取stdout,一个用于读取stderr,然后阻塞掉当前线程。其实可以只创建一个新的线程用于读取stderr,用当前线程读取stdout。

我使用了ping命令来进行测试,因为ping明显是隔一段时间输出一段,可以明显看出是不是实时进行日志记录。但是linux上ping默认是不会停止的,需要注意一下。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有