utils.py 文件源码

python
阅读 103 收藏 0 点赞 0 评论 0

项目:ops_tools 作者: juliovp01 项目源码 文件源码
def check_process_exists_and_amqp_connected(name):
    processes = filter(lambda p: check_process_name(name, p),
                       psutil.process_iter())
    if not processes:
        critical("%s is not running" % name)
    for p in processes:
        try:
            connections = p.get_connections(kind='inet')
        except psutil.NoSuchProcess:
            continue
        found_amqp = (
            len(list(itertools.takewhile(lambda c: len(c.remote_address) <= 1
                                         or c.remote_address[1]
                                         != AMQP_PORT, connections)))
            != len(connections))
        if found_amqp:
            ok("%s is working." % name)
    critical("%s is not connected to AMQP" % name)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号