def get_ips():
"""
>>> from psutil._common import snic
>>> import mock
>>> MOCK = {
... "awdl0": [snic(family=30, address="fe80::3854:80ff:fe54:7bf8%awdl0", netmask="ffff:ffff:ffff:ffff::", broadcast=None, ptp=None)],
... "en0": [snic(family=2, address="192.168.10.200", netmask="255.255.255.0", broadcast="192.168.10.255", ptp=None),
... snic(family=30, address="fe80::6e40:8ff:feac:4f94%en0", netmask="ffff:ffff:ffff:ffff::", broadcast=None, ptp=None)],
... "bridge0": [snic(family=18, address="6e:40:08:ca:60:00", netmask=None, broadcast=None, ptp=None)],
... "lo0": [snic(family=2, address="127.0.0.1", netmask="255.0.0.0", broadcast=None, ptp=None),
... snic(family=30, address="fe80::1%lo0", netmask="ffff:ffff:ffff:ffff::", broadcast=None, ptp=None)]}
>>> with mock.patch("psutil.net_if_addrs", side_effect=lambda: MOCK):
... get_ips()
['127.0.0.1/255.0.0.0', '192.168.10.200/255.255.255.0']
"""
return sorted(flatten(chain(get_network().values())))
python类_common()的实例源码
def test_parse_environ_block(self):
from psutil._common import parse_environ_block
def k(s):
return s.upper() if WINDOWS else s
self.assertEqual(parse_environ_block("a=1\0"),
{k("a"): "1"})
self.assertEqual(parse_environ_block("a=1\0b=2\0\0"),
{k("a"): "1", k("b"): "2"})
self.assertEqual(parse_environ_block("a=1\0b=\0\0"),
{k("a"): "1", k("b"): ""})
# ignore everything after \0\0
self.assertEqual(parse_environ_block("a=1\0b=2\0\0c=3\0"),
{k("a"): "1", k("b"): "2"})
# ignore everything that is not an assignment
self.assertEqual(parse_environ_block("xxx\0a=1\0"), {k("a"): "1"})
self.assertEqual(parse_environ_block("a=1\0=b=2\0"), {k("a"): "1"})
# do not fail if the block is incomplete
self.assertEqual(parse_environ_block("a=1\0b=2"), {k("a"): "1"})
def test_supports_ipv6(self):
if supports_ipv6():
with mock.patch('psutil._common.socket') as s:
s.has_ipv6 = False
assert not supports_ipv6()
with mock.patch('psutil._common.socket.socket',
side_effect=socket.error) as s:
assert not supports_ipv6()
assert s.called
with mock.patch('psutil._common.socket.socket',
side_effect=socket.gaierror) as s:
assert not supports_ipv6()
assert s.called
with mock.patch('psutil._common.socket.socket.bind',
side_effect=socket.gaierror) as s:
assert not supports_ipv6()
assert s.called
else:
if hasattr(socket, 'AF_INET6'):
with self.assertRaises(Exception):
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.bind(("::1", 0))
def test_isfile_strict(self):
from psutil._common import isfile_strict
this_file = os.path.abspath(__file__)
assert isfile_strict(this_file)
assert not isfile_strict(os.path.dirname(this_file))
with mock.patch('psutil._common.os.stat',
side_effect=OSError(errno.EPERM, "foo")):
self.assertRaises(OSError, isfile_strict, this_file)
with mock.patch('psutil._common.os.stat',
side_effect=OSError(errno.EACCES, "foo")):
self.assertRaises(OSError, isfile_strict, this_file)
with mock.patch('psutil._common.os.stat',
side_effect=OSError(errno.EINVAL, "foo")):
assert not isfile_strict(this_file)
with mock.patch('psutil._common.stat.S_ISREG', return_value=False):
assert not isfile_strict(this_file)
def test_race_condition(self, mock_get_utility, mock_process, mock_net):
# This tests a race condition, or permission problem, or OS
# incompatibility in which, for some reason, no process name can be
# found to match the identified listening PID.
from psutil._common import sconn
conns = [
sconn(fd=-1, family=2, type=1, laddr=("0.0.0.0", 30),
raddr=(), status="LISTEN", pid=None),
sconn(fd=3, family=2, type=1, laddr=("192.168.5.10", 32783),
raddr=("20.40.60.80", 22), status="ESTABLISHED", pid=1234),
sconn(fd=-1, family=10, type=1, laddr=("::1", 54321),
raddr=("::1", 111), status="CLOSE_WAIT", pid=None),
sconn(fd=3, family=2, type=1, laddr=("0.0.0.0", 17),
raddr=(), status="LISTEN", pid=4416)]
mock_net.return_value = conns
mock_process.side_effect = psutil.NoSuchProcess("No such PID")
# We simulate being unable to find the process name of PID 4416,
# which results in returning False.
self.assertFalse(self._call(17))
self.assertEqual(mock_get_utility.generic_notification.call_count, 0)
mock_process.assert_called_once_with(4416)
def test_listening_ipv4(self, mock_get_utility, mock_process, mock_net):
from psutil._common import sconn
conns = [
sconn(fd=-1, family=2, type=1, laddr=("0.0.0.0", 30),
raddr=(), status="LISTEN", pid=None),
sconn(fd=3, family=2, type=1, laddr=("192.168.5.10", 32783),
raddr=("20.40.60.80", 22), status="ESTABLISHED", pid=1234),
sconn(fd=-1, family=10, type=1, laddr=("::1", 54321),
raddr=("::1", 111), status="CLOSE_WAIT", pid=None),
sconn(fd=3, family=2, type=1, laddr=("0.0.0.0", 17),
raddr=(), status="LISTEN", pid=4416)]
mock_net.return_value = conns
mock_process.name.return_value = "inetd"
result = self._call(17)
self.assertTrue(result)
self.assertEqual(mock_get_utility.call_count, 1)
mock_process.assert_called_once_with(4416)
def test_listening_ipv6(self, mock_get_utility, mock_process, mock_net):
from psutil._common import sconn
conns = [
sconn(fd=-1, family=2, type=1, laddr=("0.0.0.0", 30),
raddr=(), status="LISTEN", pid=None),
sconn(fd=3, family=2, type=1, laddr=("192.168.5.10", 32783),
raddr=("20.40.60.80", 22), status="ESTABLISHED", pid=1234),
sconn(fd=-1, family=10, type=1, laddr=("::1", 54321),
raddr=("::1", 111), status="CLOSE_WAIT", pid=None),
sconn(fd=3, family=10, type=1, laddr=("::", 12345), raddr=(),
status="LISTEN", pid=4420),
sconn(fd=3, family=2, type=1, laddr=("0.0.0.0", 17),
raddr=(), status="LISTEN", pid=4416)]
mock_net.return_value = conns
mock_process.name.return_value = "inetd"
result = self._call(12345)
self.assertTrue(result)
self.assertEqual(mock_get_utility.call_count, 1)
mock_process.assert_called_once_with(4420)
def test_parse_environ_block(self):
from psutil._common import parse_environ_block
def k(s):
return s.upper() if WINDOWS else s
self.assertEqual(parse_environ_block("a=1\0"),
{k("a"): "1"})
self.assertEqual(parse_environ_block("a=1\0b=2\0\0"),
{k("a"): "1", k("b"): "2"})
self.assertEqual(parse_environ_block("a=1\0b=\0\0"),
{k("a"): "1", k("b"): ""})
# ignore everything after \0\0
self.assertEqual(parse_environ_block("a=1\0b=2\0\0c=3\0"),
{k("a"): "1", k("b"): "2"})
# ignore everything that is not an assignment
self.assertEqual(parse_environ_block("xxx\0a=1\0"), {k("a"): "1"})
self.assertEqual(parse_environ_block("a=1\0=b=2\0"), {k("a"): "1"})
# do not fail if the block is incomplete
self.assertEqual(parse_environ_block("a=1\0b=2"), {k("a"): "1"})
def test_supports_ipv6(self):
if supports_ipv6():
with mock.patch('psutil._common.socket') as s:
s.has_ipv6 = False
assert not supports_ipv6()
with mock.patch('psutil._common.socket.socket',
side_effect=socket.error) as s:
assert not supports_ipv6()
assert s.called
with mock.patch('psutil._common.socket.socket',
side_effect=socket.gaierror) as s:
assert not supports_ipv6()
assert s.called
with mock.patch('psutil._common.socket.socket.bind',
side_effect=socket.gaierror) as s:
assert not supports_ipv6()
assert s.called
else:
if hasattr(socket, 'AF_INET6'):
with self.assertRaises(Exception):
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.bind(("::1", 0))
def test_isfile_strict(self):
from psutil._common import isfile_strict
this_file = os.path.abspath(__file__)
assert isfile_strict(this_file)
assert not isfile_strict(os.path.dirname(this_file))
with mock.patch('psutil._common.os.stat',
side_effect=OSError(errno.EPERM, "foo")):
self.assertRaises(OSError, isfile_strict, this_file)
with mock.patch('psutil._common.os.stat',
side_effect=OSError(errno.EACCES, "foo")):
self.assertRaises(OSError, isfile_strict, this_file)
with mock.patch('psutil._common.os.stat',
side_effect=OSError(errno.EINVAL, "foo")):
assert not isfile_strict(this_file)
with mock.patch('psutil._common.stat.S_ISREG', return_value=False):
assert not isfile_strict(this_file)
def test_net_connections(self):
def check(cons, families, types_):
for conn in cons:
self.assertIn(conn.family, families, msg=conn)
if conn.family != getattr(socket, 'AF_UNIX', object()):
self.assertIn(conn.type, types_, msg=conn)
self.assertIsInstance(conn.status, (str, unicode))
from psutil._common import conn_tmap
for kind, groups in conn_tmap.items():
if SUNOS and kind == 'unix':
continue
families, types_ = groups
cons = psutil.net_connections(kind)
self.assertEqual(len(cons), len(set(cons)))
check(cons, families, types_)
def test_parse_environ_block(self):
from psutil._common import parse_environ_block
def k(s):
return s.upper() if WINDOWS else s
self.assertEqual(parse_environ_block("a=1\0"),
{k("a"): "1"})
self.assertEqual(parse_environ_block("a=1\0b=2\0\0"),
{k("a"): "1", k("b"): "2"})
self.assertEqual(parse_environ_block("a=1\0b=\0\0"),
{k("a"): "1", k("b"): ""})
# ignore everything after \0\0
self.assertEqual(parse_environ_block("a=1\0b=2\0\0c=3\0"),
{k("a"): "1", k("b"): "2"})
# ignore everything that is not an assignment
self.assertEqual(parse_environ_block("xxx\0a=1\0"), {k("a"): "1"})
self.assertEqual(parse_environ_block("a=1\0=b=2\0"), {k("a"): "1"})
# do not fail if the block is incomplete
self.assertEqual(parse_environ_block("a=1\0b=2"), {k("a"): "1"})
def test_supports_ipv6(self):
if supports_ipv6():
with mock.patch('psutil._common.socket') as s:
s.has_ipv6 = False
assert not supports_ipv6()
with mock.patch('psutil._common.socket.socket',
side_effect=socket.error) as s:
assert not supports_ipv6()
assert s.called
with mock.patch('psutil._common.socket.socket',
side_effect=socket.gaierror) as s:
assert not supports_ipv6()
assert s.called
with mock.patch('psutil._common.socket.socket.bind',
side_effect=socket.gaierror) as s:
assert not supports_ipv6()
assert s.called
else:
if hasattr(socket, 'AF_INET6'):
with self.assertRaises(Exception):
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.bind(("::1", 0))
def test_isfile_strict(self):
from psutil._common import isfile_strict
this_file = os.path.abspath(__file__)
assert isfile_strict(this_file)
assert not isfile_strict(os.path.dirname(this_file))
with mock.patch('psutil._common.os.stat',
side_effect=OSError(errno.EPERM, "foo")):
self.assertRaises(OSError, isfile_strict, this_file)
with mock.patch('psutil._common.os.stat',
side_effect=OSError(errno.EACCES, "foo")):
self.assertRaises(OSError, isfile_strict, this_file)
with mock.patch('psutil._common.os.stat',
side_effect=OSError(errno.EINVAL, "foo")):
assert not isfile_strict(this_file)
with mock.patch('psutil._common.stat.S_ISREG', return_value=False):
assert not isfile_strict(this_file)
def test_net_connections(self):
def check(cons, families, types_):
for conn in cons:
self.assertIn(conn.family, families, msg=conn)
if conn.family != getattr(socket, 'AF_UNIX', object()):
self.assertIn(conn.type, types_, msg=conn)
self.assertIsInstance(conn.status, (str, unicode))
from psutil._common import conn_tmap
for kind, groups in conn_tmap.items():
if SUNOS and kind == 'unix':
continue
families, types_ = groups
cons = psutil.net_connections(kind)
self.assertEqual(len(cons), len(set(cons)))
check(cons, families, types_)
def get_network(families=[socket.AF_INET]):
"""
>>> from psutil._common import snic
>>> import mock
>>> MOCK = {
... "awdl0": [snic(family=30, address="fe80::3854:80ff:fe54:7bf8%awdl0", netmask="ffff:ffff:ffff:ffff::", broadcast=None, ptp=None)],
... "en0": [snic(family=2, address="192.168.10.200", netmask="255.255.255.0", broadcast="192.168.10.255", ptp=None),
... snic(family=30, address="fe80::6e40:8ff:feac:4f94%en0", netmask="ffff:ffff:ffff:ffff::", broadcast=None, ptp=None)],
... "bridge0": [snic(family=18, address="6e:40:08:ca:60:00", netmask=None, broadcast=None, ptp=None)],
... "lo0": [snic(family=2, address="127.0.0.1", netmask="255.0.0.0", broadcast=None, ptp=None),
... snic(family=30, address="fe80::1%lo0", netmask="ffff:ffff:ffff:ffff::", broadcast=None, ptp=None)]}
>>> with mock.patch("psutil.net_if_addrs", side_effect=lambda: MOCK):
... data_inet = get_network([socket.AF_INET])
... sorted(data_inet.keys())
['en0', 'lo0']
>>> with mock.patch("psutil.net_if_addrs", side_effect=lambda: MOCK):
... sorted(data_inet.values())
[[u'127.0.0.1/255.0.0.0'], [u'192.168.10.200/255.255.255.0']]
>>> with mock.patch("psutil.net_if_addrs", side_effect=lambda: MOCK):
... data_inet6 = get_network([socket.AF_INET6])
... sorted(flatten(data_inet6.values()))
['fe80::1%lo0/ffff:ffff:ffff:ffff::', 'fe80::3854:80ff:fe54:7bf8%awdl0/ffff:ffff:ffff:ffff::', 'fe80::6e40:8ff:feac:4f94%en0/ffff:ffff:ffff:ffff::']
"""
nic = psutil.net_if_addrs()
ips = defaultdict(list)
# return nic
for card, addresses in nic.items():
for address in addresses:
if address.family in families:
ips[card].append("{0.address}/{0.netmask}".format(address))
return dict(ips)
# return flatten([[d.address for d in data if is_valid_ip(d)] for card, data in nic.items()])
def compare_proc_sys_cons(self, pid, proc_cons):
from psutil._common import pconn
sys_cons = [c[:-1] for c in psutil.net_connections(kind='all')
if c.pid == pid]
if FREEBSD:
# on FreeBSD all fds are set to -1
proc_cons = [pconn(*[-1] + list(x[1:])) for x in proc_cons]
self.assertEqual(sorted(proc_cons), sorted(sys_cons))
def test_net_connections(self):
def check(cons, families, types_):
for conn in cons:
self.assertIn(conn.family, families, msg=conn)
if conn.family != getattr(socket, 'AF_UNIX', object()):
self.assertIn(conn.type, types_, msg=conn)
from psutil._common import conn_tmap
for kind, groups in conn_tmap.items():
if SUNOS and kind == 'unix':
continue
families, types_ = groups
cons = psutil.net_connections(kind)
self.assertEqual(len(cons), len(set(cons)))
check(cons, families, types_)
def test_not_listening(self, mock_get_utility, mock_process, mock_net):
from psutil._common import sconn
conns = [
sconn(fd=-1, family=2, type=1, laddr=("0.0.0.0", 30),
raddr=(), status="LISTEN", pid=None),
sconn(fd=3, family=2, type=1, laddr=("192.168.5.10", 32783),
raddr=("20.40.60.80", 22), status="ESTABLISHED", pid=1234),
sconn(fd=-1, family=10, type=1, laddr=("::1", 54321),
raddr=("::1", 111), status="CLOSE_WAIT", pid=None)]
mock_net.return_value = conns
mock_process.name.return_value = "inetd"
self.assertFalse(self._call(17))
self.assertEqual(mock_get_utility.generic_notification.call_count, 0)
self.assertEqual(mock_process.call_count, 0)
def compare_proc_sys_cons(self, pid, proc_cons):
from psutil._common import pconn
sys_cons = [c[:-1] for c in psutil.net_connections(kind='all')
if c.pid == pid]
if FREEBSD:
# on FreeBSD all fds are set to -1
proc_cons = [pconn(*[-1] + list(x[1:])) for x in proc_cons]
self.assertEqual(sorted(proc_cons), sorted(sys_cons))
def compare_proc_sys_cons(self, pid, proc_cons):
from psutil._common import pconn
sys_cons = [c[:-1] for c in psutil.net_connections(kind='all')
if c.pid == pid]
if FREEBSD:
# on FreeBSD all fds are set to -1
proc_cons = [pconn(*[-1] + list(x[1:])) for x in proc_cons]
self.assertEqual(sorted(proc_cons), sorted(sys_cons))