def test_procfs_path(self):
tdir = tempfile.mkdtemp()
try:
psutil.PROCFS_PATH = tdir
self.assertRaises(IOError, psutil.virtual_memory)
self.assertRaises(IOError, psutil.cpu_times)
self.assertRaises(IOError, psutil.cpu_times, percpu=True)
self.assertRaises(IOError, psutil.boot_time)
# self.assertRaises(IOError, psutil.pids)
self.assertRaises(IOError, psutil.net_connections)
self.assertRaises(IOError, psutil.net_io_counters)
self.assertRaises(IOError, psutil.net_if_stats)
self.assertRaises(IOError, psutil.disk_io_counters)
self.assertRaises(IOError, psutil.disk_partitions)
self.assertRaises(psutil.NoSuchProcess, psutil.Process)
finally:
psutil.PROCFS_PATH = "/proc"
os.rmdir(tdir)
python类net_io_counters()的实例源码
def get_network():
network = psutil.net_io_counters(pernic=True)
ifaces = psutil.net_if_addrs()
networks = list()
for k, v in ifaces.items():
ip = v[0].address
data = network[k]
ifnet = dict()
ifnet['ip'] = ip
ifnet['iface'] = k
ifnet['sent'] = '%.2fMB' % (data.bytes_sent/1024/1024)
ifnet['recv'] = '%.2fMB' % (data.bytes_recv/1024/1024)
ifnet['packets_sent'] = data.packets_sent
ifnet['packets_recv'] = data.packets_recv
ifnet['errin'] = data.errin
ifnet['errout'] = data.errout
ifnet['dropin'] = data.dropin
ifnet['dropout'] = data.dropout
networks.append(ifnet)
return networks
def stream_host_stats():
while True:
net = psutil.net_io_counters(pernic=True)
time.sleep(1)
net1 = psutil.net_io_counters(pernic=True)
net_stat_download = {}
net_stat_upload = {}
for k, v in net.items():
for k1, v1 in net1.items():
if k1 == k:
net_stat_download[k] = (v1.bytes_recv - v.bytes_recv) / 1000.
net_stat_upload[k] = (v1.bytes_sent - v.bytes_sent) / 1000.
ds = statvfs('/')
disk_str = {"Used": ((ds.f_blocks - ds.f_bfree) * ds.f_frsize) / 10 ** 9, "Unused": (ds.f_bavail * ds.f_frsize) / 10 ** 9}
yield '[{"cpu":"%s","memory":"%s","memTotal":"%s","net_stats_down":"%s","net_stats_up":"%s","disk":"%s"}],' \
% (psutil.cpu_percent(interval=1), psutil.virtual_memory().used, psutil.virtual_memory().free, \
net_stat_download, net_stat_upload, disk_str)
def get_network(self):
usage = 0
current_network = psutil.net_io_counters()[0]
if self._previous_network == 0:
# Check, wait a second, then check again to get a base value.
self._previous_network = current_network
time.sleep(2)
current_network = psutil.net_io_counters()[0]
time_since = time.time() - self._last_check
usage = (current_network - self._previous_network)
self._previous_network = current_network
self._last_check = time.time()
return usage
def test_serialization(self):
def check(ret):
if json is not None:
json.loads(json.dumps(ret))
a = pickle.dumps(ret)
b = pickle.loads(a)
self.assertEqual(ret, b)
check(psutil.Process().as_dict())
check(psutil.virtual_memory())
check(psutil.swap_memory())
check(psutil.cpu_times())
check(psutil.cpu_times_percent(interval=0))
check(psutil.net_io_counters())
if LINUX and not os.path.exists('/proc/diskstats'):
pass
else:
if not APPVEYOR:
check(psutil.disk_io_counters())
check(psutil.disk_partitions())
check(psutil.disk_usage(os.getcwd()))
check(psutil.users())
def update(self):
"""Function to update the entire class information."""
self.cpu["percentage"] = psutil.cpu_percent(interval=0.7)
self.boot = datetime.datetime.fromtimestamp(psutil.boot_time()).strftime(
"%Y-%m-%d %H:%M:%S")
virtual_memory = psutil.virtual_memory()
self.memory["used"] = virtual_memory.used
self.memory["free"] = virtual_memory.free
self.memory["cached"] = virtual_memory.cached
net_io_counters = psutil.net_io_counters()
self.network["packet_sent"] = net_io_counters.packets_sent
self.network["packet_recv"] = net_io_counters.packets_recv
disk_usage = psutil.disk_usage('/')
self.disk["total"] = int(disk_usage.total/1024)
self.disk["used"] = int(disk_usage.used/1024)
self.disk["free"] = int(disk_usage.free/1024)
self.timestamp = time.time()
def test_procfs_path(self):
tdir = tempfile.mkdtemp()
try:
psutil.PROCFS_PATH = tdir
self.assertRaises(IOError, psutil.virtual_memory)
self.assertRaises(IOError, psutil.cpu_times)
self.assertRaises(IOError, psutil.cpu_times, percpu=True)
self.assertRaises(IOError, psutil.boot_time)
# self.assertRaises(IOError, psutil.pids)
self.assertRaises(IOError, psutil.net_connections)
self.assertRaises(IOError, psutil.net_io_counters)
self.assertRaises(IOError, psutil.net_if_stats)
self.assertRaises(IOError, psutil.disk_io_counters)
self.assertRaises(IOError, psutil.disk_partitions)
self.assertRaises(psutil.NoSuchProcess, psutil.Process)
finally:
psutil.PROCFS_PATH = "/proc"
os.rmdir(tdir)
def test_serialization(self):
def check(ret):
if json is not None:
json.loads(json.dumps(ret))
a = pickle.dumps(ret)
b = pickle.loads(a)
self.assertEqual(ret, b)
check(psutil.Process().as_dict())
check(psutil.virtual_memory())
check(psutil.swap_memory())
check(psutil.cpu_times())
check(psutil.cpu_times_percent(interval=0))
check(psutil.net_io_counters())
if LINUX and not os.path.exists('/proc/diskstats'):
pass
else:
if not APPVEYOR:
check(psutil.disk_io_counters())
check(psutil.disk_partitions())
check(psutil.disk_usage(os.getcwd()))
check(psutil.users())
def test_nic_names(self):
p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE)
output = p.communicate()[0].strip()
if p.returncode != 0:
raise unittest.SkipTest('ifconfig returned no output')
if PY3:
output = str(output, sys.stdout.encoding)
for nic in psutil.net_io_counters(pernic=True).keys():
for line in output.split():
if line.startswith(nic):
break
else:
self.fail(
"couldn't find %s nic in 'ifconfig -a' output\n%s" % (
nic, output))
# can't find users on APPVEYOR or TRAVIS
def test_procfs_path(self):
tdir = tempfile.mkdtemp()
try:
psutil.PROCFS_PATH = tdir
self.assertRaises(IOError, psutil.virtual_memory)
self.assertRaises(IOError, psutil.cpu_times)
self.assertRaises(IOError, psutil.cpu_times, percpu=True)
self.assertRaises(IOError, psutil.boot_time)
# self.assertRaises(IOError, psutil.pids)
self.assertRaises(IOError, psutil.net_connections)
self.assertRaises(IOError, psutil.net_io_counters)
self.assertRaises(IOError, psutil.net_if_stats)
self.assertRaises(IOError, psutil.disk_io_counters)
self.assertRaises(IOError, psutil.disk_partitions)
self.assertRaises(psutil.NoSuchProcess, psutil.Process)
finally:
psutil.PROCFS_PATH = "/proc"
os.rmdir(tdir)
def test_serialization(self):
def check(ret):
if json is not None:
json.loads(json.dumps(ret))
a = pickle.dumps(ret)
b = pickle.loads(a)
self.assertEqual(ret, b)
check(psutil.Process().as_dict())
check(psutil.virtual_memory())
check(psutil.swap_memory())
check(psutil.cpu_times())
check(psutil.cpu_times_percent(interval=0))
check(psutil.net_io_counters())
if LINUX and not os.path.exists('/proc/diskstats'):
pass
else:
if not APPVEYOR:
check(psutil.disk_io_counters())
check(psutil.disk_partitions())
check(psutil.disk_usage(os.getcwd()))
check(psutil.users())
def test_nic_names(self):
p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE)
output = p.communicate()[0].strip()
if p.returncode != 0:
raise unittest.SkipTest('ifconfig returned no output')
if PY3:
output = str(output, sys.stdout.encoding)
for nic in psutil.net_io_counters(pernic=True).keys():
for line in output.split():
if line.startswith(nic):
break
else:
self.fail(
"couldn't find %s nic in 'ifconfig -a' output\n%s" % (
nic, output))
# can't find users on APPVEYOR or TRAVIS
def collect_net_stats(self):
raw_stats = psutil.net_io_counters(pernic=True)
for key in raw_stats.keys():
if re.match('[\d]+-[\d]+',key) is not None:
if key not in self.net_stats.keys():
self.net_stats[key] = {}
self.net_stats[key]['bytes_sent'] = 0
self.net_stats[key]['bytes_recv'] = 0
self.net_stats[key]['bytes_recv_per_sec'] = round((int(raw_stats[key].bytes_sent) - self.net_stats[key]['bytes_recv']) / self.interval)
self.net_stats[key]['bytes_sent_per_sec'] = round((int(raw_stats[key].bytes_recv) - self.net_stats[key]['bytes_sent']) / self.interval)
self.net_stats[key]['bytes_recv'] = int(raw_stats[key].bytes_sent)
self.net_stats[key]['bytes_sent'] = int(raw_stats[key].bytes_recv)
self.net_stats[key]['packets_recv'] = int(raw_stats[key].packets_sent)
self.net_stats[key]['packets_sent'] = int(raw_stats[key].packets_recv)
self.net_stats[key]['errin'] = int(raw_stats[key].errout)
self.net_stats[key]['errout'] = int(raw_stats[key].errin)
self.net_stats[key]['dropin'] = int(raw_stats[key].dropout)
self.net_stats[key]['dropout'] = int(raw_stats[key].dropin)
else:
if key not in gateways_stats.keys():
gateways_stats[key] = {}
gateways_stats[key]['bytes_recv'] = int(raw_stats[key].bytes_sent)
gateways_stats[key]['bytes_sent'] = int(raw_stats[key].bytes_recv)
gateways_stats[key]['bytes_total'] = gateways_stats[key]['bytes_recv'] + gateways_stats[key]['bytes_sent']
#logger.info(self.net_stats)
# the main function to collect monitoring data of a container
def check_network(i_warning, i_critical):
test_int(i_warning, i_critical)
s_perfdata = ''
s_output = ''
i_max = 0
s_maxdesc = ''
d_io_counters = psutil.net_io_counters(pernic=True)
for s_device, nt_counters in d_io_counters.items():
d_counters = nt_counters._asdict()
# add all io_counters to perfdata
for key, value in d_counters.items():
if 'err' in key or 'drop' in key:
if value > i_max:
i_max = value
s_maxdesc = '{} has {} {} packets.'.format(s_device, value, key)
s_perfdata = add_perfdata(s_perfdata, s_device, key, value)
s_output = check_status(i_warning, i_critical, i_max)
if not 'OK' in s_output: s_output += s_maxdesc
s_output += ' | {}'.format(s_perfdata)
return s_output
def _get_bytes(interface):
try:
io_counters = psutil.net_io_counters(pernic=True)
except AttributeError:
io_counters = psutil.network_io_counters(pernic=True)
if_io = io_counters.get(interface)
if not if_io:
return None
return if_io.bytes_recv, if_io.bytes_sent
def _get_interfaces():
try:
io_counters = psutil.net_io_counters(pernic=True)
except AttributeError:
io_counters = psutil.network_io_counters(pernic=True)
for interface, data in io_counters.items():
if data:
yield interface, data.bytes_recv, data.bytes_sent
def get_networks(self):
return psutil.net_io_counters(pernic=True)
system_utilities.py 文件源码
项目:CommunityCellularManager
作者: facebookincubator
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def get_data(self):
"""Gets system utilization stats."""
# Get system utilization stats.
cpu_percent = psutil.cpu_percent(interval=1)
memory_percent = psutil.virtual_memory().percent
disk_percent = psutil.disk_usage('/').percent
network_io = psutil.net_io_counters()
# Compute deltas for sent and received bytes. Note this is system-wide
# network usage and not necessarily GPRS-related.
# TODO(matt): query on a specific interface..which one, I'm not sure.
if self.last_bytes_sent == 0:
bytes_sent_delta = 0
else:
bytes_sent_delta = network_io.bytes_sent - self.last_bytes_sent
self.last_bytes_sent = network_io.bytes_sent
if self.last_bytes_received == 0:
bytes_received_delta = 0
else:
bytes_received_delta = (
network_io.bytes_recv - self.last_bytes_received)
self.last_bytes_received = network_io.bytes_recv
return {
'cpu_percent': cpu_percent,
'memory_percent': memory_percent,
'disk_percent': disk_percent,
'bytes_sent_delta': bytes_sent_delta,
'bytes_received_delta': bytes_received_delta,
}
def network_recv():
return bytes2human(psutil.net_io_counters().bytes_recv)
def network_sent():
return bytes2human(psutil.net_io_counters().bytes_sent)
def _update_widgets(self, widgets):
interfaces = [i for i in netifaces.interfaces() if not i.startswith(self._exclude)]
del widgets[:]
counters = psutil.net_io_counters(pernic=True)
for interface in interfaces:
if not interface: interface = "lo"
state = "down"
if len(self.get_addresses(interface)) > 0:
state = "up"
if len(self._states["exclude"]) > 0 and state in self._states["exclude"]: continue
if len(self._states["include"]) > 0 and state not in self._states["include"]: continue
data = {
"rx": counters[interface].bytes_recv,
"tx": counters[interface].bytes_sent,
}
name = "traffic-{}".format(interface)
if self._showname:
self.create_widget(widgets, name, interface)
for direction in ["rx", "tx"]:
name = "traffic.{}-{}".format(direction, interface)
widget = self.create_widget(widgets, name, attributes={"theme.minwidth": "1000.00MB"})
prev = self._prev.get(name, 0)
speed = bumblebee.util.bytefmt(int(data[direction]) - int(prev))
widget.full_text(speed)
self._prev[name] = data[direction]
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
def network(self, all_interface=False):
"""
all_interface: if true, shows all interface network statistics
Return: dict
"""
if all_interface:
stats = psutil.net_io_counters(pernic=True)
else:
stats = psutil.net_io_counters()
if all_interface:
n = {}
for k, v in stats.items():
n[k] = {
"bytes_send": self.hr(v.bytes_sent),
"bytes_recv": self.hr(v.bytes_recv),
"packets_sent": self.hr(v.packets_sent),
"packets_recv": self.hr(v.packets_recv),
"errin": v.errin,
"errorout": v.errout,
"dropin": v.dropin,
"dropout": v.dropout
}
else:
n = {
"bytes_sent": self.hr(stats.bytes_sent),
"bytes_recv": self.hr(stats.bytes_recv),
"packets_sent": self.hr(stats.packets_sent),
"packets_recv": self.hr(stats.packets_recv),
"errin": stats.errin,
"errorout": stats.errout,
"dropin": stats.dropin,
"dropout": stats.dropout
}
return n
def get_stats(self):
cpct = psutil.cpu_percent(interval=0)
ctimes = psutil.cpu_times_percent()
self.cpu_stats = CpuStats(cpct, ctimes.user, ctimes.system,
ctimes.idle)
self.vmem_stats = psutil.virtual_memory()
self.disk_stats = psutil.disk_io_counters()
self.net_stats = psutil.net_io_counters()
# must create new stats list each time stats are updated
# because named tuples are immutable
self.statslist = [self.cpu_stats, self.vmem_stats, self.disk_stats,
self.net_stats]
def test_start(self):
process = psutil.Process()
self.run_data['start'] = True
self.run_data['test_status'] = 'running'
self.run_data['stats'] = {'net:start': json.dumps(psutil.net_io_counters()),
'cpu:start': json.dumps(process.cpu_times()),
'mem:start': json.dumps(process.memory_info()),
'time:start': json.dumps(time.time())}
return ('ok', None)
def get_stats(self):
process = psutil.Process()
self.run_data['stats']['msg_cnt'] = self.msg_cnt
self.run_data['stats']['net:end'] = json.dumps(psutil.net_io_counters())
self.run_data['stats']['cpu:end'] = json.dumps(process.cpu_times())
self.run_data['stats']['mem:end'] = json.dumps(process.memory_info())
self.run_data['stats']['reconnect_cnt'] = self.reconnect_cnt
self.run_data['stats']['rate'] = self.run_data['stats']['msg_cnt'] / (
self.run_data['last_msg_time_r'] - self.run_data['first_msg_time_r'])
return ('ok', self.run_data['stats'])
def reset_stats(self):
l.info("RESETTING SUB STATS")
process = psutil.Process()
self.run_data = {'stats': {}}
self.run_data['stats'] = {'msg_cnt': 0, 'first_msg_time': 0, 'last_msg_time': 0}
self.run_data['stats']['net:start'] = json.dumps(psutil.net_io_counters())
self.run_data['stats']['cpu:start'] = json.dumps(process.cpu_times())
self.run_data['stats']['mem:start'] = json.dumps(process.memory_info())
self.run_data['first_msg_time_r'] = 0
self.run_data['last_msg_time_r'] = 1
self.msg_cnt = 0
self.reconnect_cnt = 0
return ('ok', 'stats reset')
def get_stats(self):
process = psutil.Process()
self.run_data['stats']['net']['end'] = psutil.net_io_counters()
self.run_data['stats']['cpu']['end'] = process.cpu_times()
self.run_data['stats']['mem']['end'] = process.memory_info()
duration = self.run_data['last_msg_time'] - self.run_data['first_msg_time']
if duration == 0:
self.run_data['rate'] = 0
else:
self.run_data['rate'] = self.run_data['msg_cnt'] / duration
return ('ok', self.run_data)
def reset_stats(self):
l.info("RESETTING SUB STATS")
process = psutil.Process()
self.run_data = {'msg_cnt': 0, 'first_msg_time': 0, 'last_msg_time': 0, 'stats': {}}
self.run_data['stats']['net'] = {'start': psutil.net_io_counters()}
self.run_data['stats']['cpu'] = {'start': process.cpu_times()}
self.run_data['stats']['mem'] = {'start': process.memory_info()}
self.msg_cnt = 0
return ('ok', 'stats reset')
def test_start(self):
process = psutil.Process()
self.run_data['start'] = True
self.run_data['test_status'] = 'running'
self.run_data['stats'] = {'net:start': json.dumps(psutil.net_io_counters()),
'cpu:start': json.dumps(process.cpu_times()),
'mem:start': json.dumps(process.memory_info()),
'time:start': json.dumps(time.time())}
return ('ok', None)
def get_stats(self):
process = psutil.Process()
self.run_data['stats']['msg_cnt'] = self.msg_cnt
self.run_data['stats']['net:end'] = json.dumps(psutil.net_io_counters())
self.run_data['stats']['cpu:end'] = json.dumps(process.cpu_times())
self.run_data['stats']['mem:end'] = json.dumps(process.memory_info())
self.run_data['stats']['rate'] = self.run_data['stats']['msg_cnt'] / (
self.run_data['last_msg_time_r'] - self.run_data['first_msg_time_r'])
return ('ok', self.run_data['stats'])