def test_procfs_path(self):
tdir = tempfile.mkdtemp()
try:
psutil.PROCFS_PATH = tdir
self.assertRaises(IOError, psutil.virtual_memory)
self.assertRaises(IOError, psutil.cpu_times)
self.assertRaises(IOError, psutil.cpu_times, percpu=True)
self.assertRaises(IOError, psutil.boot_time)
# self.assertRaises(IOError, psutil.pids)
self.assertRaises(IOError, psutil.net_connections)
self.assertRaises(IOError, psutil.net_io_counters)
self.assertRaises(IOError, psutil.net_if_stats)
self.assertRaises(IOError, psutil.disk_io_counters)
self.assertRaises(IOError, psutil.disk_partitions)
self.assertRaises(psutil.NoSuchProcess, psutil.Process)
finally:
psutil.PROCFS_PATH = "/proc"
os.rmdir(tdir)
python类net_if_stats()的实例源码
def get_active_nic_name():
'''?????????????'''
all_nic=psutil.net_if_stats()#??????????
nic_names=all_nic.keys()
try:
all_nic.pop('docker0')
except:
pass
try:
all_nic.pop('lo')
except:
pass
return all_nic
pass
def test_procfs_path(self):
tdir = tempfile.mkdtemp()
try:
psutil.PROCFS_PATH = tdir
self.assertRaises(IOError, psutil.virtual_memory)
self.assertRaises(IOError, psutil.cpu_times)
self.assertRaises(IOError, psutil.cpu_times, percpu=True)
self.assertRaises(IOError, psutil.boot_time)
# self.assertRaises(IOError, psutil.pids)
self.assertRaises(IOError, psutil.net_connections)
self.assertRaises(IOError, psutil.net_io_counters)
self.assertRaises(IOError, psutil.net_if_stats)
self.assertRaises(IOError, psutil.disk_io_counters)
self.assertRaises(IOError, psutil.disk_partitions)
self.assertRaises(psutil.NoSuchProcess, psutil.Process)
finally:
psutil.PROCFS_PATH = "/proc"
os.rmdir(tdir)
def test_procfs_path(self):
tdir = tempfile.mkdtemp()
try:
psutil.PROCFS_PATH = tdir
self.assertRaises(IOError, psutil.virtual_memory)
self.assertRaises(IOError, psutil.cpu_times)
self.assertRaises(IOError, psutil.cpu_times, percpu=True)
self.assertRaises(IOError, psutil.boot_time)
# self.assertRaises(IOError, psutil.pids)
self.assertRaises(IOError, psutil.net_connections)
self.assertRaises(IOError, psutil.net_io_counters)
self.assertRaises(IOError, psutil.net_if_stats)
self.assertRaises(IOError, psutil.disk_io_counters)
self.assertRaises(IOError, psutil.disk_partitions)
self.assertRaises(psutil.NoSuchProcess, psutil.Process)
finally:
psutil.PROCFS_PATH = "/proc"
os.rmdir(tdir)
def collect_system_information(self):
values = {}
mem = psutil.virtual_memory()
values['memory_total'] = mem.total
import cpuinfo
cpu = cpuinfo.get_cpu_info()
values['resources_limit'] = self.resources_limit
values['cpu_name'] = cpu['brand']
values['cpu'] = [cpu['hz_advertised_raw'][0], cpu['count']]
values['nets'] = {}
values['disks'] = {}
values['gpus'] = {}
values['boot_time'] = psutil.boot_time()
try:
for gpu_id, gpu in enumerate(aetros.cuda_gpu.get_ordered_devices()):
gpu['available'] = gpu_id in self.enabled_gpus
values['gpus'][gpu_id] = gpu
except Exception: pass
for disk in psutil.disk_partitions():
try:
name = self.get_disk_name(disk[1])
values['disks'][name] = psutil.disk_usage(disk[1]).total
except Exception:
# suppress Operation not permitted
pass
try:
for id, net in psutil.net_if_stats().items():
if 0 != id.find('lo') and net.isup:
self.nets.append(id)
values['nets'][id] = net.speed or 1000
except Exception:
# suppress Operation not permitted
pass
return values
def test_net_if_stats(self):
self.execute(psutil.net_if_stats)
# --- others
def test_net_if_stats(self):
for name, stats in psutil.net_if_stats().items():
try:
out = sh("ifconfig %s" % name)
except RuntimeError:
pass
else:
self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
self.assertEqual(stats.mtu,
int(re.findall('MTU:(\d+)', out)[0]))
def test_net_if_stats(self):
for name, stats in psutil.net_if_stats().items():
try:
out = sh("ifconfig %s" % name)
except RuntimeError:
pass
else:
self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
self.assertEqual(stats.mtu,
int(re.findall('mtu (\d+)', out)[0]))
def test_net_if_stats(self):
for name, stats in psutil.net_if_stats().items():
try:
out = sh("ifconfig %s" % name)
except RuntimeError:
pass
else:
self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
self.assertEqual(stats.mtu,
int(re.findall('mtu (\d+)', out)[0]))
# =====================================================================
# --- FreeBSD
# =====================================================================
def find_default_interface(self):
"""Look through the list of interfaces for the non-loopback interface"""
import psutil
try:
if self.interfaces is None:
self.interfaces = {}
# Look to see which interfaces are up
stats = psutil.net_if_stats()
for interface in stats:
if interface != 'lo' and interface[:3] != 'ifb' and stats[interface].isup:
self.interfaces[interface] = {'packets': 0}
if len(self.interfaces) > 1:
# See which interfaces have received data
cnt = psutil.net_io_counters(True)
for interface in cnt:
if interface in self.interfaces:
self.interfaces[interface]['packets'] = \
cnt[interface].packets_sent + cnt[interface].packets_recv
remove = []
for interface in self.interfaces:
if self.interfaces[interface]['packets'] == 0:
remove.append(interface)
if len(remove):
for interface in remove:
del self.interfaces[interface]
if len(self.interfaces) > 1:
# Eliminate any with the loopback address
remove = []
addresses = psutil.net_if_addrs()
for interface in addresses:
if interface in self.interfaces:
for address in addresses[interface]:
if address.address == '127.0.0.1':
remove.append(interface)
break
if len(remove):
for interface in remove:
del self.interfaces[interface]
except Exception:
pass
def test_net_if_stats(self):
self.execute(psutil.net_if_stats)
# --- sensors
def test_net_if_stats(self):
for name, stats in psutil.net_if_stats().items():
try:
out = sh("ifconfig %s" % name)
except RuntimeError:
pass
else:
# Not always reliable.
# self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
self.assertEqual(stats.mtu,
int(re.findall('MTU:(\d+)', out)[0]))
def test_net_if_stats(self):
for name, stats in psutil.net_if_stats().items():
try:
out = sh("ifconfig %s" % name)
except RuntimeError:
pass
else:
self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
self.assertEqual(stats.mtu,
int(re.findall('mtu (\d+)', out)[0]))
def test_net_if_stats(self):
for name, stats in psutil.net_if_stats().items():
try:
out = sh("ifconfig %s" % name)
except RuntimeError:
pass
else:
self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
self.assertEqual(stats.mtu,
int(re.findall('mtu (\d+)', out)[0]))
# =====================================================================
# --- FreeBSD
# =====================================================================
def test_net_if_stats(self):
self.execute(psutil.net_if_stats)
# --- sensors
def test_net_if_stats(self):
for name, stats in psutil.net_if_stats().items():
try:
out = sh("ifconfig %s" % name)
except RuntimeError:
pass
else:
# Not always reliable.
# self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
self.assertEqual(stats.mtu,
int(re.findall('MTU:(\d+)', out)[0]))
def test_net_if_stats(self):
for name, stats in psutil.net_if_stats().items():
try:
out = sh("ifconfig %s" % name)
except RuntimeError:
pass
else:
self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
self.assertEqual(stats.mtu,
int(re.findall('mtu (\d+)', out)[0]))
def test_net_if_stats(self):
for name, stats in psutil.net_if_stats().items():
try:
out = sh("ifconfig %s" % name)
except RuntimeError:
pass
else:
self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
self.assertEqual(stats.mtu,
int(re.findall('mtu (\d+)', out)[0]))
# =====================================================================
# --- FreeBSD
# =====================================================================
def main():
stats = psutil.net_if_stats()
io_counters = psutil.net_io_counters(pernic=True)
for nic, addrs in psutil.net_if_addrs().items():
print("%s:" % (nic))
if nic in stats:
st = stats[nic]
print(" stats : ", end='')
print("speed=%sMB, duplex=%s, mtu=%s, up=%s" % (
st.speed, duplex_map[st.duplex], st.mtu,
"yes" if st.isup else "no"))
if nic in io_counters:
io = io_counters[nic]
print(" incoming : ", end='')
print("bytes=%s, pkts=%s, errs=%s, drops=%s" % (
io.bytes_recv, io.packets_recv, io.errin, io.dropin))
print(" outgoing : ", end='')
print("bytes=%s, pkts=%s, errs=%s, drops=%s" % (
io.bytes_sent, io.packets_sent, io.errout, io.dropout))
for addr in addrs:
print(" %-4s" % af_map.get(addr.family, addr.family), end="")
print(" address : %s" % addr.address)
if addr.broadcast:
print(" broadcast : %s" % addr.broadcast)
if addr.netmask:
print(" netmask : %s" % addr.netmask)
if addr.ptp:
print(" p2p : %s" % addr.ptp)
print("")
def _get_cali_veth_stat(self):
'''
Check the status of all network interfaces.
Value is 1 if any one of them is DOWN
'''
cali_veth_up = 0
cali_veth_down = 0
cali_veth_total = 0
tmp_veth_up = 0
tmp_veth_down = 0
tmp_veth_total = 0
for name, stat in psutil.net_if_stats().iteritems():
if name.startswith('cali'):
cali_veth_total += 1
if stat.isup:
cali_veth_up += 1
else:
cali_veth_down += 1
elif name.startswith('tmp'):
tmp_veth_total += 1
if stat.isup:
tmp_veth_up += 1
else:
tmp_veth_down += 1
self._result.append(
GraphiteData("lain.cluster.calico.veth.cali.up",
self._endpoint, cali_veth_up, self._step, "val"))
self._result.append(
GraphiteData("lain.cluster.calico.veth.cali.down",
self._endpoint, cali_veth_down, self._step, "val"))
self._result.append(
GraphiteData("lain.cluster.calico.veth.cali.total",
self._endpoint, cali_veth_total, self._step, "val"))
self._result.append(
GraphiteData("lain.cluster.calico.veth.tmp.up",
self._endpoint, tmp_veth_up, self._step, "val"))
self._result.append(
GraphiteData("lain.cluster.calico.veth.tmp.down",
self._endpoint, tmp_veth_down, self._step, "val"))
self._result.append(
GraphiteData("lain.cluster.calico.veth.tmp.total",
self._endpoint, tmp_veth_total, self._step, "val"))
def get_eth_info(self):
"""
?????
1. ??bond???bond0
2. ???????eth0:1
3. ?ip???eth0(1.1.1.1,2.2.2.2)
"""
data = []
addrs = psutil.net_if_addrs()
stats = psutil.net_if_stats()
for name, entries in addrs.iteritems():
eth = {
'name': name,
'mac': '00:00:00:00:00:00',
'ip': '',
'mask': '',
'broadcast': '',
'status': 'Unknown',
'speed': 0
}
try: # windows???
eth['name'] = name.decode('gbk')
except:
eth['name'] = name
if name in stats:
eth['status'] = 'Active' if int(stats[name].isup) else 'Inactive'
eth['speed'] = stats[name].speed
for entry in entries:
if entry.family == psutil.AF_LINK:
eth['mac'] = entry.address.upper()
elif entry.family == socket.AF_INET:
if eth['ip']:# ??????ip????????????
data.append(copy.deepcopy(eth))
eth['ip'] = entry.address
eth['mask'] = entry.netmask
eth['broadcast'] = entry.broadcast
else: # ????IPV6
continue
data.append(eth)
# ???????????????mac?????00:00:00:00:00:00????????????":"
for item in data:
# ??????
if ':' not in item['name']:
continue
# ???mac?????????????????????":"????
if item['mac'] != '00:00:00:00:00:00':
continue
name = item['name'].rsplit(':', 1)[0]
for iitem in data:
if name == iitem['name']:
item['mac'] = iitem['mac']
break
data.sort(key=lambda x: x['name'])
return data
def test_net_if_addrs(self):
nics = psutil.net_if_addrs()
assert nics, nics
nic_stats = psutil.net_if_stats()
# Not reliable on all platforms (net_if_addrs() reports more
# interfaces).
# self.assertEqual(sorted(nics.keys()),
# sorted(psutil.net_io_counters(pernic=True).keys()))
families = set([socket.AF_INET, AF_INET6, psutil.AF_LINK])
for nic, addrs in nics.items():
self.assertIsInstance(nic, (str, unicode))
self.assertEqual(len(set(addrs)), len(addrs))
for addr in addrs:
self.assertIsInstance(addr.family, int)
self.assertIsInstance(addr.address, str)
self.assertIsInstance(addr.netmask, (str, type(None)))
self.assertIsInstance(addr.broadcast, (str, type(None)))
self.assertIn(addr.family, families)
if sys.version_info >= (3, 4):
self.assertIsInstance(addr.family, enum.IntEnum)
if nic_stats[nic].isup:
# Do not test binding to addresses of interfaces
# that are down
if addr.family == socket.AF_INET:
s = socket.socket(addr.family)
with contextlib.closing(s):
s.bind((addr.address, 0))
elif addr.family == socket.AF_INET6:
info = socket.getaddrinfo(
addr.address, 0, socket.AF_INET6,
socket.SOCK_STREAM, 0, socket.AI_PASSIVE)[0]
af, socktype, proto, canonname, sa = info
s = socket.socket(af, socktype, proto)
with contextlib.closing(s):
s.bind(sa)
for ip in (addr.address, addr.netmask, addr.broadcast,
addr.ptp):
if ip is not None:
# TODO: skip AF_INET6 for now because I get:
# AddressValueError: Only hex digits permitted in
# u'c6f3%lxcbr0' in u'fe80::c8e0:fff:fe54:c6f3%lxcbr0'
if addr.family != AF_INET6:
check_net_address(ip, addr.family)
# broadcast and ptp addresses are mutually exclusive
if addr.broadcast:
self.assertIsNone(addr.ptp)
elif addr.ptp:
self.assertIsNone(addr.broadcast)
if BSD or OSX or SUNOS:
if hasattr(socket, "AF_LINK"):
self.assertEqual(psutil.AF_LINK, socket.AF_LINK)
elif LINUX:
self.assertEqual(psutil.AF_LINK, socket.AF_PACKET)
elif WINDOWS:
self.assertEqual(psutil.AF_LINK, -1)
def test_net_if_addrs(self):
nics = psutil.net_if_addrs()
assert nics, nics
nic_stats = psutil.net_if_stats()
# Not reliable on all platforms (net_if_addrs() reports more
# interfaces).
# self.assertEqual(sorted(nics.keys()),
# sorted(psutil.net_io_counters(pernic=True).keys()))
families = set([socket.AF_INET, AF_INET6, psutil.AF_LINK])
for nic, addrs in nics.items():
self.assertIsInstance(nic, (str, unicode))
self.assertEqual(len(set(addrs)), len(addrs))
for addr in addrs:
self.assertIsInstance(addr.family, int)
self.assertIsInstance(addr.address, str)
self.assertIsInstance(addr.netmask, (str, type(None)))
self.assertIsInstance(addr.broadcast, (str, type(None)))
self.assertIn(addr.family, families)
if sys.version_info >= (3, 4):
self.assertIsInstance(addr.family, enum.IntEnum)
if nic_stats[nic].isup:
# Do not test binding to addresses of interfaces
# that are down
if addr.family == socket.AF_INET:
s = socket.socket(addr.family)
with contextlib.closing(s):
s.bind((addr.address, 0))
elif addr.family == socket.AF_INET6:
info = socket.getaddrinfo(
addr.address, 0, socket.AF_INET6,
socket.SOCK_STREAM, 0, socket.AI_PASSIVE)[0]
af, socktype, proto, canonname, sa = info
s = socket.socket(af, socktype, proto)
with contextlib.closing(s):
s.bind(sa)
for ip in (addr.address, addr.netmask, addr.broadcast,
addr.ptp):
if ip is not None:
# TODO: skip AF_INET6 for now because I get:
# AddressValueError: Only hex digits permitted in
# u'c6f3%lxcbr0' in u'fe80::c8e0:fff:fe54:c6f3%lxcbr0'
if addr.family != AF_INET6:
check_net_address(ip, addr.family)
# broadcast and ptp addresses are mutually exclusive
if addr.broadcast:
self.assertIsNone(addr.ptp)
elif addr.ptp:
self.assertIsNone(addr.broadcast)
if BSD or OSX or SUNOS:
if hasattr(socket, "AF_LINK"):
self.assertEqual(psutil.AF_LINK, socket.AF_LINK)
elif LINUX:
self.assertEqual(psutil.AF_LINK, socket.AF_PACKET)
elif WINDOWS:
self.assertEqual(psutil.AF_LINK, -1)
def test_net_if_addrs(self):
nics = psutil.net_if_addrs()
assert nics, nics
nic_stats = psutil.net_if_stats()
# Not reliable on all platforms (net_if_addrs() reports more
# interfaces).
# self.assertEqual(sorted(nics.keys()),
# sorted(psutil.net_io_counters(pernic=True).keys()))
families = set([socket.AF_INET, AF_INET6, psutil.AF_LINK])
for nic, addrs in nics.items():
self.assertEqual(len(set(addrs)), len(addrs))
for addr in addrs:
self.assertIsInstance(addr.family, int)
self.assertIsInstance(addr.address, str)
self.assertIsInstance(addr.netmask, (str, type(None)))
self.assertIsInstance(addr.broadcast, (str, type(None)))
self.assertIn(addr.family, families)
if sys.version_info >= (3, 4):
self.assertIsInstance(addr.family, enum.IntEnum)
if nic_stats[nic].isup:
# Do not test binding to addresses of interfaces
# that are down
if addr.family == socket.AF_INET:
s = socket.socket(addr.family)
with contextlib.closing(s):
s.bind((addr.address, 0))
elif addr.family == socket.AF_INET6:
info = socket.getaddrinfo(
addr.address, 0, socket.AF_INET6,
socket.SOCK_STREAM, 0, socket.AI_PASSIVE)[0]
af, socktype, proto, canonname, sa = info
s = socket.socket(af, socktype, proto)
with contextlib.closing(s):
s.bind(sa)
for ip in (addr.address, addr.netmask, addr.broadcast,
addr.ptp):
if ip is not None:
# TODO: skip AF_INET6 for now because I get:
# AddressValueError: Only hex digits permitted in
# u'c6f3%lxcbr0' in u'fe80::c8e0:fff:fe54:c6f3%lxcbr0'
if addr.family != AF_INET6:
check_net_address(ip, addr.family)
# broadcast and ptp addresses are mutually exclusive
if addr.broadcast:
self.assertIsNone(addr.ptp)
elif addr.ptp:
self.assertIsNone(addr.broadcast)
if BSD or OSX or SUNOS:
if hasattr(socket, "AF_LINK"):
self.assertEqual(psutil.AF_LINK, socket.AF_LINK)
elif LINUX:
self.assertEqual(psutil.AF_LINK, socket.AF_PACKET)
elif WINDOWS:
self.assertEqual(psutil.AF_LINK, -1)