python类net_connections()的实例源码

proc.py 文件源码 项目:opensnitch 作者: evilsocket 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_pid_by_connection(src_addr, src_p, dst_addr, dst_p, proto='tcp'):
    # We always take the first element as we assume it contains only one
    # It should not be possible to keep two connections which are the same.
    for conn in psutil.net_connections(kind=proto):
        if proto == 'tcp':
            if conn.laddr != (src_addr, int(src_p)):
                continue

            if conn.raddr != (dst_addr, int(dst_p)):
                continue

        # UDP gives us a very limited dataset to work with
        elif proto == 'udp':
            if conn.laddr[1] != int(src_p):
                continue

        return conn.pid

    logging.warning("Could not find process for %s connection %s:%s -> %s:%s",
                    proto,
                    src_addr,
                    src_p,
                    dst_addr,
                    dst_p)

    return None
util.py 文件源码 项目:certbot 作者: nikoloskii 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def already_listening(port, renewer=False):
    """Check if a process is already listening on the port.

    If so, also tell the user via a display notification.

    .. warning::
        On some operating systems, this function can only usefully be
        run as root.

    :param int port: The TCP port in question.
    :returns: True or False.

    """
    try:
        net_connections = psutil.net_connections()
    except psutil.AccessDenied as error:
        logger.info("Access denied when trying to list network "
                    "connections: %s. Are you root?", error)
        # this function is just a pre-check that often causes false
        # positives and problems in testing (c.f. #680 on Mac, #255
        # generally); we will fail later in bind() anyway
        return False

    listeners = [conn.pid for conn in net_connections
                 if conn.status == 'LISTEN' and
                 conn.type == socket.SOCK_STREAM and
                 conn.laddr[1] == port]
    try:
        if listeners and listeners[0] is not None:
            # conn.pid may be None if the current process doesn't have
            # permission to identify the listening process!  Additionally,
            # listeners may have more than one element if separate
            # sockets have bound the same port on separate interfaces.
            # We currently only have UI to notify the user about one
            # of them at a time.
            pid = listeners[0]
            name = psutil.Process(pid).name()
            display = zope.component.getUtility(interfaces.IDisplay)
            extra = ""
            if renewer:
                extra = (
                    " For automated renewal, you may want to use a script that stops"
                    " and starts your webserver. You can find an example at"
                    " https://letsencrypt.org/howitworks/#writing-your-own-renewal-script"
                    ". Alternatively you can use the webroot plugin to renew without"
                    " needing to stop and start your webserver.")
            display.notification(
                "The program {0} (process ID {1}) is already listening "
                "on TCP port {2}. This will prevent us from binding to "
                "that port. Please stop the {0} program temporarily "
                "and then try again.{3}".format(name, pid, port, extra),
                height=13)
            return True
    except (psutil.NoSuchProcess, psutil.AccessDenied):
        # Perhaps the result of a race where the process could have
        # exited or relinquished the port (NoSuchProcess), or the result
        # of an OS policy where we're not allowed to look up the process
        # name (AccessDenied).
        pass
    return False
server_handler.py 文件源码 项目:PyGNS3 作者: mvdwoord 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _getDebugData():
        try:
            connections = psutil.net_connections()
        # You need to be root for OSX
        except psutil.AccessDenied:
            connections = None

        try:
            addrs = ["* {}: {}".format(key, val) for key, val in psutil.net_if_addrs().items()]
        except UnicodeDecodeError:
            addrs = ["INVALID ADDR WITH UNICODE CHARACTERS"]

        data = """Version: {version}
OS: {os}
Python: {python}
CPU: {cpu}
Memory: {memory}

Networks:
{addrs}

Open connections:
{connections}

Processus:
""".format(
            version=__version__,
            os=platform.platform(),
            python=platform.python_version(),
            memory=psutil.virtual_memory(),
            cpu=psutil.cpu_times(),
            connections=connections,
            addrs="\n".join(addrs)
        )
        for proc in psutil.process_iter():
            try:
                psinfo = proc.as_dict(attrs=["name", "exe"])
                data += "* {} {}\n".format(psinfo["name"], psinfo["exe"])
            except psutil.NoSuchProcess:
                pass

        data += "\n\nProjects"
        for project in Controller.instance().projects.values():
            data += "\n\nProject name: {}\nProject ID: {}\n".format(project.name, project.id)
            for link in project.links.values():
                data += "Link {}: {}".format(link.id, link.debug_link_data)

        return data
conn_collector.py 文件源码 项目:ops_agent 作者: sjqzhang 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_connections(self):
        listening = []
        connections = psutil.net_connections()
        for conn in connections:
            if not conn.pid:
                continue
            if conn.status == psutil.CONN_LISTEN:
                ip, port = conn.laddr
                # instance = '%s:%s' % (ip, port)
                if port not in listening:
                    listening.append(port)

        res = []
        clients = {}
        for conn in connections:
            if not conn.pid:
                continue
            if not conn.status == psutil.CONN_ESTABLISHED:
                continue

            ip, port = conn.laddr
            # instance = '%s:%s' % (ip, port)
            if port in listening:
                continue

            try:
                proc = psutil.Process(conn.pid)
            except psutil.NoSuchProcess:
                continue

            current = time.time()
            create_time = proc.create_time()
            if current - create_time < 60 * 5:
                continue

            rip, rport = conn.raddr
            record_id = '%s-%s:%s' % (conn.pid, rip, rport)
            client = clients.setdefault(record_id, {})
            if not client:
                client['client_pid'] = conn.pid
                client['client_process_create_time'] = int(create_time)
                client['client_pname'] = proc.name()
                client['client_cwd'] = proc.cwd()
                client['client_ip'] = ip
                # client['client_ports'] = '%s' % port
                client['client_port_num'] = 1
                client['server_ip'] = rip
                client['server_port'] = rport
                client['server_pid'] = ''
                client['server_pname'] = ''
            else:
                # client['client_ports'] += ', %s' % port
                client['client_port_num'] += 1

        res = clients.values()
        res.sort(key=lambda x: (x['client_pid'], x['server_port']))
        return res


问题


面经


文章

微信
公众号

扫码关注公众号