python类net_connections()的实例源码

test_linux.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_procfs_path(self):
        tdir = tempfile.mkdtemp()
        try:
            psutil.PROCFS_PATH = tdir
            self.assertRaises(IOError, psutil.virtual_memory)
            self.assertRaises(IOError, psutil.cpu_times)
            self.assertRaises(IOError, psutil.cpu_times, percpu=True)
            self.assertRaises(IOError, psutil.boot_time)
            # self.assertRaises(IOError, psutil.pids)
            self.assertRaises(IOError, psutil.net_connections)
            self.assertRaises(IOError, psutil.net_io_counters)
            self.assertRaises(IOError, psutil.net_if_stats)
            self.assertRaises(IOError, psutil.disk_io_counters)
            self.assertRaises(IOError, psutil.disk_partitions)
            self.assertRaises(psutil.NoSuchProcess, psutil.Process)
        finally:
            psutil.PROCFS_PATH = "/proc"
            os.rmdir(tdir)
test_linux.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_net_connections_mocked(self):
        def open_mock(name, *args, **kwargs):
            if name == '/proc/net/unix':
                return io.StringIO(textwrap.dedent(u"""\
                    0: 00000003 000 000 0001 03 462170 @/tmp/dbus-Qw2hMPIU3n
                    0: 00000003 000 000 0001 03 35010 @/tmp/dbus-tB2X8h69BQ
                    0: 00000003 000 000 0001 03 34424 @/tmp/dbus-cHy80Y8O
                    000000000000000000000000000000000000000000000000000000
                    """))
            else:
                return orig_open(name, *args, **kwargs)

        orig_open = open
        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
        with mock.patch(patch_point, side_effect=open_mock) as m:
            psutil.net_connections(kind='unix')
            assert m.called


# =====================================================================
# system disk
# =====================================================================
test_linux.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_net_connections_mocked(self):
        def open_mock(name, *args, **kwargs):
            if name == '/proc/net/unix':
                return io.StringIO(textwrap.dedent(u"""\
                    0: 00000003 000 000 0001 03 462170 @/tmp/dbus-Qw2hMPIU3n
                    0: 00000003 000 000 0001 03 35010 @/tmp/dbus-tB2X8h69BQ
                    0: 00000003 000 000 0001 03 34424 @/tmp/dbus-cHy80Y8O
                    000000000000000000000000000000000000000000000000000000
                    """))
            else:
                return orig_open(name, *args, **kwargs)

        orig_open = open
        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
        with mock.patch(patch_point, side_effect=open_mock) as m:
            psutil.net_connections(kind='unix')
            assert m.called


# =====================================================================
# system disk
# =====================================================================
test_linux.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_procfs_path(self):
        tdir = tempfile.mkdtemp()
        try:
            psutil.PROCFS_PATH = tdir
            self.assertRaises(IOError, psutil.virtual_memory)
            self.assertRaises(IOError, psutil.cpu_times)
            self.assertRaises(IOError, psutil.cpu_times, percpu=True)
            self.assertRaises(IOError, psutil.boot_time)
            # self.assertRaises(IOError, psutil.pids)
            self.assertRaises(IOError, psutil.net_connections)
            self.assertRaises(IOError, psutil.net_io_counters)
            self.assertRaises(IOError, psutil.net_if_stats)
            self.assertRaises(IOError, psutil.disk_io_counters)
            self.assertRaises(IOError, psutil.disk_partitions)
            self.assertRaises(psutil.NoSuchProcess, psutil.Process)
        finally:
            psutil.PROCFS_PATH = "/proc"
            os.rmdir(tdir)
test_linux.py 文件源码 项目:FancyWord 作者: EastonLee 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_net_connections_mocked(self):
        def open_mock(name, *args, **kwargs):
            if name == '/proc/net/unix':
                return io.StringIO(textwrap.dedent(u"""\
                    0: 00000003 000 000 0001 03 462170 @/tmp/dbus-Qw2hMPIU3n
                    0: 00000003 000 000 0001 03 35010 @/tmp/dbus-tB2X8h69BQ
                    0: 00000003 000 000 0001 03 34424 @/tmp/dbus-cHy80Y8O
                    000000000000000000000000000000000000000000000000000000
                    """))
            else:
                return orig_open(name, *args, **kwargs)

        orig_open = open
        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
        with mock.patch(patch_point, side_effect=open_mock) as m:
            psutil.net_connections(kind='unix')
            assert m.called


# =====================================================================
# --- system disk
# =====================================================================
test_linux.py 文件源码 项目:FancyWord 作者: EastonLee 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_procfs_path(self):
        tdir = tempfile.mkdtemp()
        try:
            psutil.PROCFS_PATH = tdir
            self.assertRaises(IOError, psutil.virtual_memory)
            self.assertRaises(IOError, psutil.cpu_times)
            self.assertRaises(IOError, psutil.cpu_times, percpu=True)
            self.assertRaises(IOError, psutil.boot_time)
            # self.assertRaises(IOError, psutil.pids)
            self.assertRaises(IOError, psutil.net_connections)
            self.assertRaises(IOError, psutil.net_io_counters)
            self.assertRaises(IOError, psutil.net_if_stats)
            self.assertRaises(IOError, psutil.disk_io_counters)
            self.assertRaises(IOError, psutil.disk_partitions)
            self.assertRaises(psutil.NoSuchProcess, psutil.Process)
        finally:
            psutil.PROCFS_PATH = "/proc"
            os.rmdir(tdir)
redis_sampler_server_starter.py 文件源码 项目:pyabc 作者: neuralyzer 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, host="localhost", port=6379):
        conn = psutil.net_connections()
        ports = [c.laddr[1] for c in conn]
        port = max(ports) + 1
        self.__port = port
        self.__redis_server = Popen(["redis-server", "--port", str(port)])
        sleep(1)
        self.__worker = [
            Process(target=work,
                    args=(["--host", "localhost", "--port", str(port)],),
                    daemon=False)
            for _ in range(2)
        ]
        for p in self.__worker:
            p.start()
        super().__init__(host, port)
ServerPlugin.py 文件源码 项目:pybix 作者: lioncui 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_tcp_info(self):
        returnData = {}
        try:
            conns = psutil.net_connections()
            sumConn = {}
            for con in conns:
                if con.status == 'NONE':
                    continue
                if con.status not in sumConn:
                    sumConn[con.status] = 0
                sumConn[con.status] = sumConn[con.status] + 1
            for element in sumConn:
                if 'tcpsum' not in returnData:
                    returnData['tcpsum'] = {}
                returnData['tcpsum'][element] = sumConn[element]
        except Exception:
            pybixlib.error(self.logHead + traceback.format_exc())
            self.errorInfoDone(traceback.format_exc())
        return returnData
netstat.py 文件源码 项目:LinuxBashShellScriptForOps 作者: DingGuodong 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def main():
    templ = "%-5s %-30s %-30s %-13s %-6s %s"
    print(templ % (
        "Proto", "Local address", "Remote address", "Status", "PID",
        "Program name"))
    proc_names = {}
    for p in psutil.process_iter():
        try:
            proc_names[p.pid] = p.name()
        except psutil.Error:
            pass
    for c in psutil.net_connections(kind='inet'):
        laddr = "%s:%s" % (c.laddr)
        raddr = ""
        if c.raddr:
            raddr = "%s:%s" % (c.raddr)
        print(templ % (
            proto_map[(c.family, c.type)],
            laddr,
            raddr or AD,
            c.status,
            c.pid or AD,
            proc_names.get(c.pid, '?')[:15],
        ))
pyGetTCPPortStatisticsOnLocal.py 文件源码 项目:LinuxBashShellScriptForOps 作者: DingGuodong 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_tcp_all_conns_count_top(top=10):
    netstat = psutil.net_connections(kind="tcp")

    cared_data = list()

    for sconn in netstat:
        cared_data.append((sconn.laddr[0], sconn.laddr[1]))

    cared_data_with_counter = Counter(cared_data)

    if len(cared_data) < top:
        top = len(cared_data)

    tcp_conns_top_x = sorted(cared_data_with_counter.iteritems(), key=lambda x: x[1], reverse=True)

    return tcp_conns_top_x[0:top]
helpers.py 文件源码 项目:corvus-web-public 作者: eleme 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def check_ports(ports):
    connections = psutil.net_connections()
    all_ports = set(
        c.laddr[1] for c in connections if isinstance(c.laddr, tuple))
    ports_inuse = set(ports) & all_ports
    if ports_inuse:
        raise TaskException(
            'ports {} are in use'.format(', '.join(map(str, ports_inuse))))
utils.py 文件源码 项目:raiden 作者: raiden-network 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_free_port(address, initial_port):
    """Find an unused TCP port in a specified range. This should not
      be used in misson-critical applications - a race condition may
      occur if someone grabs the port before caller of this function
      has chance to use it.
      Parameters:
          address       (string): an ip address of interface to use
          initial_port  (int)   : port to start iteration with
      Return:
          iterator that will return next unused port on a specified
            interface
    """

    try:
        # On OSX this function requires root privileges
        psutil.net_connections()
    except psutil.AccessDenied:
        return count(initial_port)

    def _unused_ports():
        for port in count(initial_port):
            # check if the port is being used
            connect_using_port = (
                conn
                for conn in psutil.net_connections()
                if hasattr(conn, 'laddr') and
                conn.laddr[0] == address and
                conn.laddr[1] == port
            )

            # only generate unused ports
            if not any(connect_using_port):
                yield port

    return _unused_ports()
_process.py 文件源码 项目:jubakit 作者: jubatus 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _get_ports_in_use(cls):
    """
    Returns a set of ports currently used on localhost.
    """
    try:
      return set([x.laddr[1] for x in psutil.net_connections(kind='inet4')])
    except psutil.AccessDenied:
      # On some platforms (such as OS X), root privilege is required to get used ports.
      # In that case we avoid port confliction to the best of our knowledge.
      _logger.info('ports in use cannot be obtained on this platform; ports will be assigned sequentially')
      return set()
test_memory_leaks.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_net_connections(self):
        self.execute(psutil.net_connections)
test_linux.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_net_connections_ipv6_unsupported(self, supports_ipv6, inet_ntop):
        # see: https://github.com/giampaolo/psutil/issues/623
        try:
            s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
            self.addCleanup(s.close)
            s.bind(("::1", 0))
        except socket.error:
            pass
        psutil.net_connections(kind='inet6')
test_process.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def compare_proc_sys_cons(self, pid, proc_cons):
        from psutil._common import pconn
        sys_cons = [c[:-1] for c in psutil.net_connections(kind='all')
                    if c.pid == pid]
        if FREEBSD:
            # on FreeBSD all fds are set to -1
            proc_cons = [pconn(*[-1] + list(x[1:])) for x in proc_cons]
        self.assertEqual(sorted(proc_cons), sorted(sys_cons))
proc.py 文件源码 项目:pwndemo 作者: zh-explorer 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def pidof(target):
    """pidof(target) -> int list

    Get PID(s) of `target`.  The returned PID(s) depends on the type of `target`:

    - :class:`str`: PIDs of all processes with a name matching `target`.
    - :class:`pwnlib.tubes.process.process`: singleton list of the PID of `target`.
    - :class:`pwnlib.tubes.sock.sock`: singleton list of the PID at the
      remote end of `target` if it is running on the host.  Otherwise an
      empty list.

    Arguments:
        target(object):  The target whose PID(s) to find.

    Returns:
        A list of found PIDs.
    """
    if isinstance(target, tubes.sock.sock):
         local  = target.sock.getsockname()
         remote = target.sock.getpeername()

         def match(p):
             return (c.raddr, c.laddr, c.status) == (local, remote, 'ESTABLISHED')

         return [c.pid for c in psutil.net_connections() if match(c)]

    elif isinstance(target, tubes.process.process):
         return [target.proc.pid]

    else:
         return pid_by_name(target)
procinfo.py 文件源码 项目:ifi 作者: mike01 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_procinfo_by_address(ip_src, ip_dst, port_src=None, port_dst=None):
    """
    Gets Infos about the Prozess associated with the given address information
    Both port_src and port_dst must be not None or None at the same time.

    return -- [pid, "/path/to/command", "user", "hash"] or []
    """
    result = []

    if port_src is not None:
        pids = [c.pid for c in net_connections()
            if len(c.raddr) != 0 and c.pid is not None and
                (c.laddr[0], c.raddr[0], c.laddr[1], c.raddr[1]) == (ip_src, ip_dst, port_src, port_dst)]
    else:
        pids = [c.pid for c in net_connections()
            if len(c.raddr) != 0 and c.pid is not None and (c.laddr[0], c.raddr[0]) == (ip_src, ip_dst)]

    try:
        if len(pids) > 1:
            logger.warning("more than 1 matching process: %r", pids)
        proc = Process(pids[0])
        cmd = [pids[0], proc.cmdline(), proc.username()]
        hash_input = "%d%s%s" % (cmd[0], cmd[1], cmd[2])
        procinfo_hash = hashlib.sha256(hash_input.encode("UTF-8"))
        cmd.append(procinfo_hash)
        logger.debug("process info: %r", cmd)
        return cmd
    except IndexError:
        pass
    return []
topo_collector.py 文件源码 项目:ops_agent 作者: sjqzhang 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_listen_ports(self):
        """
        ??????
        :return: ??????
        """
        listen_ports = []
        connections = psutil.net_connections()
        for connection in connections:
            if connection.status == "LISTEN":
                listen_ports.append((connection.laddr[1], connection.pid))
        return listen_ports
test_memory_leaks.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_net_connections(self):
        self.execute(psutil.net_connections)
test_linux.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_net_connections_ipv6_unsupported(self, supports_ipv6, inet_ntop):
        # see: https://github.com/giampaolo/psutil/issues/623
        try:
            s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
            self.addCleanup(s.close)
            s.bind(("::1", 0))
        except socket.error:
            pass
        psutil.net_connections(kind='inet6')
test_process.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def compare_proc_sys_cons(self, pid, proc_cons):
        from psutil._common import pconn
        sys_cons = [c[:-1] for c in psutil.net_connections(kind='all')
                    if c.pid == pid]
        if FREEBSD:
            # on FreeBSD all fds are set to -1
            proc_cons = [pconn(*[-1] + list(x[1:])) for x in proc_cons]
        self.assertEqual(sorted(proc_cons), sorted(sys_cons))
test_memory_leaks.py 文件源码 项目:FancyWord 作者: EastonLee 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_net_connections(self):
        self.execute(psutil.net_connections)
test_linux.py 文件源码 项目:FancyWord 作者: EastonLee 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_net_connections_ipv6_unsupported(self, supports_ipv6, inet_ntop):
        # see: https://github.com/giampaolo/psutil/issues/623
        try:
            s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
            self.addCleanup(s.close)
            s.bind(("::1", 0))
        except socket.error:
            pass
        psutil.net_connections(kind='inet6')
test_process.py 文件源码 项目:FancyWord 作者: EastonLee 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def compare_proc_sys_cons(self, pid, proc_cons):
        from psutil._common import pconn
        sys_cons = [c[:-1] for c in psutil.net_connections(kind='all')
                    if c.pid == pid]
        if FREEBSD:
            # on FreeBSD all fds are set to -1
            proc_cons = [pconn(*[-1] + list(x[1:])) for x in proc_cons]
        self.assertEqual(sorted(proc_cons), sorted(sys_cons))
proc.py 文件源码 项目:black_zone 作者: zh-explorer 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def pidof(target):
    """pidof(target) -> int list

    Get PID(s) of `target`.  The returned PID(s) depends on the type of `target`:

    - :class:`str`: PIDs of all processes with a name matching `target`.
    - :class:`pwnlib.tubes.process.process`: singleton list of the PID of `target`.
    - :class:`pwnlib.tubes.sock.sock`: singleton list of the PID at the
      remote end of `target` if it is running on the host.  Otherwise an
      empty list.

    Arguments:
        target(object):  The target whose PID(s) to find.

    Returns:
        A list of found PIDs.
    """
    if isinstance(target, tubes.sock.sock):
         local  = target.sock.getsockname()
         remote = target.sock.getpeername()

         def match(p):
             return (c.raddr, c.laddr, c.status) == (local, remote, 'ESTABLISHED')

         return [c.pid for c in psutil.net_connections() if match(c)]

    elif isinstance(target, tubes.process.process):
         return [target.proc.pid]

    else:
         return pid_by_name(target)
pyNetstatStatistic.py 文件源码 项目:LinuxBashShellScriptForOps 作者: DingGuodong 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def print_all_conns():
    system_conns = psutil.net_connections()
    for conn in system_conns:
        print conn
pyNetstatStatistic.py 文件源码 项目:LinuxBashShellScriptForOps 作者: DingGuodong 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def print_conns_by_proc_pid(pid):
    system_conns = psutil.net_connections()
    for conn in system_conns:
        if conn.pid == pid:
            print conn
pyNetstatStatistic.py 文件源码 项目:LinuxBashShellScriptForOps 作者: DingGuodong 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def print_conns_by_remote_addr(addr):
    system_conns = psutil.net_connections()
    for conn in system_conns:
        if addr in conn.raddr:
            print conn
manifest_api.py 文件源码 项目:tm-manifesting 作者: FabricAttachedMemory 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def kill_dnsmasq(config):
    # There can be more than one bound to it, especially if libvirt was used..
    # Easy way: started from (previous) run of manifest_api.
    if config['PXE_INTERFACE'] is None:
        return
    try:
        with open(config['DNSMASQ_PIDFILE'], 'r') as f:
            pid = int(f.read())
        kill_pid(pid, 'dnsmasq')
    except Exception:   # a valiant effort in vain
        pass

    # Hard way: track down dnsmasq(s) attached to the configured interface.
    pxe_interface = config['PXE_INTERFACE']
    if pxe_interface not in NIF.interfaces():
        return
    tmp = NIF.ifaddresses(pxe_interface).get(NIF.AF_INET, None)
    if tmp is None:         # ASS-U-MES "torms" address bound here
        return
    pxe_addr = tmp[0]['addr']

    # libvirt bridge will always have DNS (53).  A truly simple net definition
    # won't have TFTP (69) but others might.  Obviously one started from here
    # will have TFTP.  And /etc/init.d/dnsmasq starts a *.* DNS listener
    # which picks up PXE_INTERFACE when explicitly bound processes are killed.
    # net_connections returns an object with a laddr field that's a tuple
    # of (listenaddress, port).   Filter on that.

    openconns = [(c.laddr[1], c.pid)    # port, pid
        for c in psutil.net_connections(kind='inet4')
            if c.laddr[1] in (53, 69) and (
                c.laddr[0] == pxe_addr or c.laddr[0] == '0.0.0.0'
            )]
    pids = set(c[1] for c in openconns)
    if pids:
        mainapp.logger.info('Killing %d copies of dnsmasq' % len(pids))
    while len(pids):
        pid = pids.pop()
        kill_pid(pid, 'dnsmasq')    # Until someone starts using bind9...


问题


面经


文章

微信
公众号

扫码关注公众号