def test_procfs_path(self):
tdir = tempfile.mkdtemp()
try:
psutil.PROCFS_PATH = tdir
self.assertRaises(IOError, psutil.virtual_memory)
self.assertRaises(IOError, psutil.cpu_times)
self.assertRaises(IOError, psutil.cpu_times, percpu=True)
self.assertRaises(IOError, psutil.boot_time)
# self.assertRaises(IOError, psutil.pids)
self.assertRaises(IOError, psutil.net_connections)
self.assertRaises(IOError, psutil.net_io_counters)
self.assertRaises(IOError, psutil.net_if_stats)
self.assertRaises(IOError, psutil.disk_io_counters)
self.assertRaises(IOError, psutil.disk_partitions)
self.assertRaises(psutil.NoSuchProcess, psutil.Process)
finally:
psutil.PROCFS_PATH = "/proc"
os.rmdir(tdir)
python类net_connections()的实例源码
def test_net_connections_mocked(self):
def open_mock(name, *args, **kwargs):
if name == '/proc/net/unix':
return io.StringIO(textwrap.dedent(u"""\
0: 00000003 000 000 0001 03 462170 @/tmp/dbus-Qw2hMPIU3n
0: 00000003 000 000 0001 03 35010 @/tmp/dbus-tB2X8h69BQ
0: 00000003 000 000 0001 03 34424 @/tmp/dbus-cHy80Y8O
000000000000000000000000000000000000000000000000000000
"""))
else:
return orig_open(name, *args, **kwargs)
orig_open = open
patch_point = 'builtins.open' if PY3 else '__builtin__.open'
with mock.patch(patch_point, side_effect=open_mock) as m:
psutil.net_connections(kind='unix')
assert m.called
# =====================================================================
# system disk
# =====================================================================
def test_net_connections_mocked(self):
def open_mock(name, *args, **kwargs):
if name == '/proc/net/unix':
return io.StringIO(textwrap.dedent(u"""\
0: 00000003 000 000 0001 03 462170 @/tmp/dbus-Qw2hMPIU3n
0: 00000003 000 000 0001 03 35010 @/tmp/dbus-tB2X8h69BQ
0: 00000003 000 000 0001 03 34424 @/tmp/dbus-cHy80Y8O
000000000000000000000000000000000000000000000000000000
"""))
else:
return orig_open(name, *args, **kwargs)
orig_open = open
patch_point = 'builtins.open' if PY3 else '__builtin__.open'
with mock.patch(patch_point, side_effect=open_mock) as m:
psutil.net_connections(kind='unix')
assert m.called
# =====================================================================
# system disk
# =====================================================================
def test_procfs_path(self):
tdir = tempfile.mkdtemp()
try:
psutil.PROCFS_PATH = tdir
self.assertRaises(IOError, psutil.virtual_memory)
self.assertRaises(IOError, psutil.cpu_times)
self.assertRaises(IOError, psutil.cpu_times, percpu=True)
self.assertRaises(IOError, psutil.boot_time)
# self.assertRaises(IOError, psutil.pids)
self.assertRaises(IOError, psutil.net_connections)
self.assertRaises(IOError, psutil.net_io_counters)
self.assertRaises(IOError, psutil.net_if_stats)
self.assertRaises(IOError, psutil.disk_io_counters)
self.assertRaises(IOError, psutil.disk_partitions)
self.assertRaises(psutil.NoSuchProcess, psutil.Process)
finally:
psutil.PROCFS_PATH = "/proc"
os.rmdir(tdir)
def test_net_connections_mocked(self):
def open_mock(name, *args, **kwargs):
if name == '/proc/net/unix':
return io.StringIO(textwrap.dedent(u"""\
0: 00000003 000 000 0001 03 462170 @/tmp/dbus-Qw2hMPIU3n
0: 00000003 000 000 0001 03 35010 @/tmp/dbus-tB2X8h69BQ
0: 00000003 000 000 0001 03 34424 @/tmp/dbus-cHy80Y8O
000000000000000000000000000000000000000000000000000000
"""))
else:
return orig_open(name, *args, **kwargs)
orig_open = open
patch_point = 'builtins.open' if PY3 else '__builtin__.open'
with mock.patch(patch_point, side_effect=open_mock) as m:
psutil.net_connections(kind='unix')
assert m.called
# =====================================================================
# --- system disk
# =====================================================================
def test_procfs_path(self):
tdir = tempfile.mkdtemp()
try:
psutil.PROCFS_PATH = tdir
self.assertRaises(IOError, psutil.virtual_memory)
self.assertRaises(IOError, psutil.cpu_times)
self.assertRaises(IOError, psutil.cpu_times, percpu=True)
self.assertRaises(IOError, psutil.boot_time)
# self.assertRaises(IOError, psutil.pids)
self.assertRaises(IOError, psutil.net_connections)
self.assertRaises(IOError, psutil.net_io_counters)
self.assertRaises(IOError, psutil.net_if_stats)
self.assertRaises(IOError, psutil.disk_io_counters)
self.assertRaises(IOError, psutil.disk_partitions)
self.assertRaises(psutil.NoSuchProcess, psutil.Process)
finally:
psutil.PROCFS_PATH = "/proc"
os.rmdir(tdir)
def __init__(self, host="localhost", port=6379):
conn = psutil.net_connections()
ports = [c.laddr[1] for c in conn]
port = max(ports) + 1
self.__port = port
self.__redis_server = Popen(["redis-server", "--port", str(port)])
sleep(1)
self.__worker = [
Process(target=work,
args=(["--host", "localhost", "--port", str(port)],),
daemon=False)
for _ in range(2)
]
for p in self.__worker:
p.start()
super().__init__(host, port)
def get_tcp_info(self):
returnData = {}
try:
conns = psutil.net_connections()
sumConn = {}
for con in conns:
if con.status == 'NONE':
continue
if con.status not in sumConn:
sumConn[con.status] = 0
sumConn[con.status] = sumConn[con.status] + 1
for element in sumConn:
if 'tcpsum' not in returnData:
returnData['tcpsum'] = {}
returnData['tcpsum'][element] = sumConn[element]
except Exception:
pybixlib.error(self.logHead + traceback.format_exc())
self.errorInfoDone(traceback.format_exc())
return returnData
def main():
templ = "%-5s %-30s %-30s %-13s %-6s %s"
print(templ % (
"Proto", "Local address", "Remote address", "Status", "PID",
"Program name"))
proc_names = {}
for p in psutil.process_iter():
try:
proc_names[p.pid] = p.name()
except psutil.Error:
pass
for c in psutil.net_connections(kind='inet'):
laddr = "%s:%s" % (c.laddr)
raddr = ""
if c.raddr:
raddr = "%s:%s" % (c.raddr)
print(templ % (
proto_map[(c.family, c.type)],
laddr,
raddr or AD,
c.status,
c.pid or AD,
proc_names.get(c.pid, '?')[:15],
))
pyGetTCPPortStatisticsOnLocal.py 文件源码
项目:LinuxBashShellScriptForOps
作者: DingGuodong
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def get_tcp_all_conns_count_top(top=10):
netstat = psutil.net_connections(kind="tcp")
cared_data = list()
for sconn in netstat:
cared_data.append((sconn.laddr[0], sconn.laddr[1]))
cared_data_with_counter = Counter(cared_data)
if len(cared_data) < top:
top = len(cared_data)
tcp_conns_top_x = sorted(cared_data_with_counter.iteritems(), key=lambda x: x[1], reverse=True)
return tcp_conns_top_x[0:top]
def check_ports(ports):
connections = psutil.net_connections()
all_ports = set(
c.laddr[1] for c in connections if isinstance(c.laddr, tuple))
ports_inuse = set(ports) & all_ports
if ports_inuse:
raise TaskException(
'ports {} are in use'.format(', '.join(map(str, ports_inuse))))
def get_free_port(address, initial_port):
"""Find an unused TCP port in a specified range. This should not
be used in misson-critical applications - a race condition may
occur if someone grabs the port before caller of this function
has chance to use it.
Parameters:
address (string): an ip address of interface to use
initial_port (int) : port to start iteration with
Return:
iterator that will return next unused port on a specified
interface
"""
try:
# On OSX this function requires root privileges
psutil.net_connections()
except psutil.AccessDenied:
return count(initial_port)
def _unused_ports():
for port in count(initial_port):
# check if the port is being used
connect_using_port = (
conn
for conn in psutil.net_connections()
if hasattr(conn, 'laddr') and
conn.laddr[0] == address and
conn.laddr[1] == port
)
# only generate unused ports
if not any(connect_using_port):
yield port
return _unused_ports()
def _get_ports_in_use(cls):
"""
Returns a set of ports currently used on localhost.
"""
try:
return set([x.laddr[1] for x in psutil.net_connections(kind='inet4')])
except psutil.AccessDenied:
# On some platforms (such as OS X), root privilege is required to get used ports.
# In that case we avoid port confliction to the best of our knowledge.
_logger.info('ports in use cannot be obtained on this platform; ports will be assigned sequentially')
return set()
def test_net_connections(self):
self.execute(psutil.net_connections)
def test_net_connections_ipv6_unsupported(self, supports_ipv6, inet_ntop):
# see: https://github.com/giampaolo/psutil/issues/623
try:
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self.addCleanup(s.close)
s.bind(("::1", 0))
except socket.error:
pass
psutil.net_connections(kind='inet6')
def compare_proc_sys_cons(self, pid, proc_cons):
from psutil._common import pconn
sys_cons = [c[:-1] for c in psutil.net_connections(kind='all')
if c.pid == pid]
if FREEBSD:
# on FreeBSD all fds are set to -1
proc_cons = [pconn(*[-1] + list(x[1:])) for x in proc_cons]
self.assertEqual(sorted(proc_cons), sorted(sys_cons))
def pidof(target):
"""pidof(target) -> int list
Get PID(s) of `target`. The returned PID(s) depends on the type of `target`:
- :class:`str`: PIDs of all processes with a name matching `target`.
- :class:`pwnlib.tubes.process.process`: singleton list of the PID of `target`.
- :class:`pwnlib.tubes.sock.sock`: singleton list of the PID at the
remote end of `target` if it is running on the host. Otherwise an
empty list.
Arguments:
target(object): The target whose PID(s) to find.
Returns:
A list of found PIDs.
"""
if isinstance(target, tubes.sock.sock):
local = target.sock.getsockname()
remote = target.sock.getpeername()
def match(p):
return (c.raddr, c.laddr, c.status) == (local, remote, 'ESTABLISHED')
return [c.pid for c in psutil.net_connections() if match(c)]
elif isinstance(target, tubes.process.process):
return [target.proc.pid]
else:
return pid_by_name(target)
def get_procinfo_by_address(ip_src, ip_dst, port_src=None, port_dst=None):
"""
Gets Infos about the Prozess associated with the given address information
Both port_src and port_dst must be not None or None at the same time.
return -- [pid, "/path/to/command", "user", "hash"] or []
"""
result = []
if port_src is not None:
pids = [c.pid for c in net_connections()
if len(c.raddr) != 0 and c.pid is not None and
(c.laddr[0], c.raddr[0], c.laddr[1], c.raddr[1]) == (ip_src, ip_dst, port_src, port_dst)]
else:
pids = [c.pid for c in net_connections()
if len(c.raddr) != 0 and c.pid is not None and (c.laddr[0], c.raddr[0]) == (ip_src, ip_dst)]
try:
if len(pids) > 1:
logger.warning("more than 1 matching process: %r", pids)
proc = Process(pids[0])
cmd = [pids[0], proc.cmdline(), proc.username()]
hash_input = "%d%s%s" % (cmd[0], cmd[1], cmd[2])
procinfo_hash = hashlib.sha256(hash_input.encode("UTF-8"))
cmd.append(procinfo_hash)
logger.debug("process info: %r", cmd)
return cmd
except IndexError:
pass
return []
def get_listen_ports(self):
"""
??????
:return: ??????
"""
listen_ports = []
connections = psutil.net_connections()
for connection in connections:
if connection.status == "LISTEN":
listen_ports.append((connection.laddr[1], connection.pid))
return listen_ports
def test_net_connections(self):
self.execute(psutil.net_connections)
def test_net_connections_ipv6_unsupported(self, supports_ipv6, inet_ntop):
# see: https://github.com/giampaolo/psutil/issues/623
try:
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self.addCleanup(s.close)
s.bind(("::1", 0))
except socket.error:
pass
psutil.net_connections(kind='inet6')
def compare_proc_sys_cons(self, pid, proc_cons):
from psutil._common import pconn
sys_cons = [c[:-1] for c in psutil.net_connections(kind='all')
if c.pid == pid]
if FREEBSD:
# on FreeBSD all fds are set to -1
proc_cons = [pconn(*[-1] + list(x[1:])) for x in proc_cons]
self.assertEqual(sorted(proc_cons), sorted(sys_cons))
def test_net_connections(self):
self.execute(psutil.net_connections)
def test_net_connections_ipv6_unsupported(self, supports_ipv6, inet_ntop):
# see: https://github.com/giampaolo/psutil/issues/623
try:
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self.addCleanup(s.close)
s.bind(("::1", 0))
except socket.error:
pass
psutil.net_connections(kind='inet6')
def compare_proc_sys_cons(self, pid, proc_cons):
from psutil._common import pconn
sys_cons = [c[:-1] for c in psutil.net_connections(kind='all')
if c.pid == pid]
if FREEBSD:
# on FreeBSD all fds are set to -1
proc_cons = [pconn(*[-1] + list(x[1:])) for x in proc_cons]
self.assertEqual(sorted(proc_cons), sorted(sys_cons))
def pidof(target):
"""pidof(target) -> int list
Get PID(s) of `target`. The returned PID(s) depends on the type of `target`:
- :class:`str`: PIDs of all processes with a name matching `target`.
- :class:`pwnlib.tubes.process.process`: singleton list of the PID of `target`.
- :class:`pwnlib.tubes.sock.sock`: singleton list of the PID at the
remote end of `target` if it is running on the host. Otherwise an
empty list.
Arguments:
target(object): The target whose PID(s) to find.
Returns:
A list of found PIDs.
"""
if isinstance(target, tubes.sock.sock):
local = target.sock.getsockname()
remote = target.sock.getpeername()
def match(p):
return (c.raddr, c.laddr, c.status) == (local, remote, 'ESTABLISHED')
return [c.pid for c in psutil.net_connections() if match(c)]
elif isinstance(target, tubes.process.process):
return [target.proc.pid]
else:
return pid_by_name(target)
pyNetstatStatistic.py 文件源码
项目:LinuxBashShellScriptForOps
作者: DingGuodong
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def print_all_conns():
system_conns = psutil.net_connections()
for conn in system_conns:
print conn
pyNetstatStatistic.py 文件源码
项目:LinuxBashShellScriptForOps
作者: DingGuodong
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def print_conns_by_proc_pid(pid):
system_conns = psutil.net_connections()
for conn in system_conns:
if conn.pid == pid:
print conn
pyNetstatStatistic.py 文件源码
项目:LinuxBashShellScriptForOps
作者: DingGuodong
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def print_conns_by_remote_addr(addr):
system_conns = psutil.net_connections()
for conn in system_conns:
if addr in conn.raddr:
print conn
def kill_dnsmasq(config):
# There can be more than one bound to it, especially if libvirt was used..
# Easy way: started from (previous) run of manifest_api.
if config['PXE_INTERFACE'] is None:
return
try:
with open(config['DNSMASQ_PIDFILE'], 'r') as f:
pid = int(f.read())
kill_pid(pid, 'dnsmasq')
except Exception: # a valiant effort in vain
pass
# Hard way: track down dnsmasq(s) attached to the configured interface.
pxe_interface = config['PXE_INTERFACE']
if pxe_interface not in NIF.interfaces():
return
tmp = NIF.ifaddresses(pxe_interface).get(NIF.AF_INET, None)
if tmp is None: # ASS-U-MES "torms" address bound here
return
pxe_addr = tmp[0]['addr']
# libvirt bridge will always have DNS (53). A truly simple net definition
# won't have TFTP (69) but others might. Obviously one started from here
# will have TFTP. And /etc/init.d/dnsmasq starts a *.* DNS listener
# which picks up PXE_INTERFACE when explicitly bound processes are killed.
# net_connections returns an object with a laddr field that's a tuple
# of (listenaddress, port). Filter on that.
openconns = [(c.laddr[1], c.pid) # port, pid
for c in psutil.net_connections(kind='inet4')
if c.laddr[1] in (53, 69) and (
c.laddr[0] == pxe_addr or c.laddr[0] == '0.0.0.0'
)]
pids = set(c[1] for c in openconns)
if pids:
mainapp.logger.info('Killing %d copies of dnsmasq' % len(pids))
while len(pids):
pid = pids.pop()
kill_pid(pid, 'dnsmasq') # Until someone starts using bind9...