def test_disk_io_counters(self):
self.execute(psutil.disk_io_counters)
# --- proc
python类disk_io_counters()的实例源码
def test_disk_io_counters_kernel_2_4_mocked(self):
# Tests /proc/diskstats parsing format for 2.4 kernels, see:
# https://github.com/giampaolo/psutil/issues/767
def open_mock(name, *args, **kwargs):
if name == '/proc/partitions':
return io.StringIO(textwrap.dedent(u"""\
major minor #blocks name
8 0 488386584 hda
"""))
elif name == '/proc/diskstats':
return io.StringIO(
u(" 3 0 1 hda 2 3 4 5 6 7 8 9 10 11 12"))
else:
return orig_open(name, *args, **kwargs)
orig_open = open
patch_point = 'builtins.open' if PY3 else '__builtin__.open'
with mock.patch(patch_point, side_effect=open_mock) as m:
ret = psutil.disk_io_counters()
assert m.called
self.assertEqual(ret.read_count, 1)
self.assertEqual(ret.read_merged_count, 2)
self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
self.assertEqual(ret.read_time, 4)
self.assertEqual(ret.write_count, 5)
self.assertEqual(ret.write_merged_count, 6)
self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
self.assertEqual(ret.write_time, 8)
self.assertEqual(ret.busy_time, 10)
def test_disk_io_counters_kernel_2_6_full_mocked(self):
# Tests /proc/diskstats parsing format for 2.6 kernels,
# lines reporting all metrics:
# https://github.com/giampaolo/psutil/issues/767
def open_mock(name, *args, **kwargs):
if name == '/proc/partitions':
return io.StringIO(textwrap.dedent(u"""\
major minor #blocks name
8 0 488386584 hda
"""))
elif name == '/proc/diskstats':
return io.StringIO(
u(" 3 0 hda 1 2 3 4 5 6 7 8 9 10 11"))
else:
return orig_open(name, *args, **kwargs)
orig_open = open
patch_point = 'builtins.open' if PY3 else '__builtin__.open'
with mock.patch(patch_point, side_effect=open_mock) as m:
ret = psutil.disk_io_counters()
assert m.called
self.assertEqual(ret.read_count, 1)
self.assertEqual(ret.read_merged_count, 2)
self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
self.assertEqual(ret.read_time, 4)
self.assertEqual(ret.write_count, 5)
self.assertEqual(ret.write_merged_count, 6)
self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
self.assertEqual(ret.write_time, 8)
self.assertEqual(ret.busy_time, 10)
def test_disk_io_counters_kernel_2_6_limited_mocked(self):
# Tests /proc/diskstats parsing format for 2.6 kernels,
# where one line of /proc/partitions return a limited
# amount of metrics when it bumps into a partition
# (instead of a disk). See:
# https://github.com/giampaolo/psutil/issues/767
def open_mock(name, *args, **kwargs):
if name == '/proc/partitions':
return io.StringIO(textwrap.dedent(u"""\
major minor #blocks name
8 0 488386584 hda
"""))
elif name == '/proc/diskstats':
return io.StringIO(
u(" 3 1 hda 1 2 3 4"))
else:
return orig_open(name, *args, **kwargs)
orig_open = open
patch_point = 'builtins.open' if PY3 else '__builtin__.open'
with mock.patch(patch_point, side_effect=open_mock) as m:
ret = psutil.disk_io_counters()
assert m.called
self.assertEqual(ret.read_count, 1)
self.assertEqual(ret.read_bytes, 2 * SECTOR_SIZE)
self.assertEqual(ret.write_count, 3)
self.assertEqual(ret.write_bytes, 4 * SECTOR_SIZE)
self.assertEqual(ret.read_merged_count, 0)
self.assertEqual(ret.read_time, 0)
self.assertEqual(ret.write_merged_count, 0)
self.assertEqual(ret.write_time, 0)
self.assertEqual(ret.busy_time, 0)
# =====================================================================
# --- misc
# =====================================================================
def test_disk_io_counters(self):
def check_ntuple(nt):
self.assertEqual(nt[0], nt.read_count)
self.assertEqual(nt[1], nt.write_count)
self.assertEqual(nt[2], nt.read_bytes)
self.assertEqual(nt[3], nt.write_bytes)
if not (OPENBSD or NETBSD):
self.assertEqual(nt[4], nt.read_time)
self.assertEqual(nt[5], nt.write_time)
if LINUX:
self.assertEqual(nt[6], nt.read_merged_count)
self.assertEqual(nt[7], nt.write_merged_count)
self.assertEqual(nt[8], nt.busy_time)
elif FREEBSD:
self.assertEqual(nt[6], nt.busy_time)
for name in nt._fields:
assert getattr(nt, name) >= 0, nt
ret = psutil.disk_io_counters(perdisk=False)
check_ntuple(ret)
ret = psutil.disk_io_counters(perdisk=True)
# make sure there are no duplicates
self.assertEqual(len(ret), len(set(ret)))
for key in ret:
assert key, key
check_ntuple(ret[key])
if LINUX and key[-1].isdigit():
# if 'sda1' is listed 'sda' shouldn't, see:
# https://github.com/giampaolo/psutil/issues/338
while key[-1].isdigit():
key = key[:-1]
self.assertNotIn(key, ret.keys())
# can't find users on APPVEYOR or TRAVIS
def GetSystemState(self):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8',0))
self.address = s.getsockname()[0]
self.cpu_count = psutil.cpu_count()
self.cpu_percent = psutil.cpu_percent(interval=1,percpu=True)
self.mem = psutil.virtual_memory()
self.disk_usage = psutil.disk_usage('/')
self.disk_io = psutil.disk_io_counters()
self.net_io = psutil.net_io_counters()
self.hostname = socket.gethostname()
def GetSystemState(self):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8',0))
self.address = s.getsockname()[0]
self.cpu_count = psutil.cpu_count()
self.cpu_percent = psutil.cpu_percent(interval=1,percpu=True)
self.mem = psutil.virtual_memory()
self.disk_usage = psutil.disk_usage('/')
self.disk_io = psutil.disk_io_counters()
self.net_io = psutil.net_io_counters()
self.hostname = socket.gethostname()
def f_print_linux_status(save_as):
###????###################################################################
#scputimes(user=, nice, system, idle, iowait, irq, softirq,steal, guest, guest_nice)
cpu_times = psutil.cpu_times()
#scpustats(ctx_switches, interrupts, soft_interrupts, syscalls)
#cpu_stats = psutil.cpu_stats()
# svmem(total , available, percent, used , free, active, inactive, buffers, cached, shared)
mem = psutil.virtual_memory()
# sswap(total, used, free, percent, sin, sout)
swap = psutil.swap_memory()
#sdiskusage(total, used, free, percent)
#disk_usage = psutil.disk_usage('/')
#sdiskio(read_count, write_count, read_bytes, write_bytes, read_time, write_time)
#disk_io_counters = psutil.disk_io_counters()
#snetio(bytes_sent, bytes_recv, packets_sent, packets_recv, errin, errout, dropin, dropout)
#net = psutil.net_io_counters()
#load
try:
load = os.getloadavg()
except (OSError, AttributeError):
stats = {}
else:
stats = {'min1': load[0], 'min5': load[1], 'min15': load[2]}
#Uptime = datetime.datetime.fromtimestamp(psutil.boot_time()).strftime("%Y-%m-%d %H:%M:%S")
###????###################################################################
style1 = {1: ' ,6,l', 2: ' ,10,r',3: ' ,6,l', 4: ' ,10,r',5: ' ,6,l', 6: ' ,6,r',7: ' ,8,l',8: ' ,6,r',9: ' ,6,l', 10: ' ,6,r',11: ' ,6,l', 12: ' ,5,r',}
style = {1: ' ,l', 2: ' ,r',3: ' ,l', 4: ' ,r',5: ' ,l', 6: ' ,r',7: ' ,l',8: ' ,r',9: ' ,l', 10: ' ,r',11: ' ,l', 12: ' ,r',}
rows=[
["CPU", str(psutil.cpu_percent(interval=1))+'%',"nice", cpu_times.nice,"MEM", str(mem.percent) + '%',"active", str(mem.active/1024/1024) + 'M',"SWAP", str(swap.percent)+'%',"LOAD", str(psutil.cpu_count())+'core'],
["user", cpu_times.user,"irq", cpu_times.irq,"total", str(mem.total/1024/1024)+'M',"inactive", str(mem.inactive/1024/1024) + 'M',"total", str(swap.total/1024/1024) + 'M',"1 min", stats["min1"]],
["system", cpu_times.system,"iowait", cpu_times.iowait,"used", str(mem.used/1024/1024)+'M',"buffers", str(mem.buffers/1024/1024) + 'M',"used", str(swap.used/1024/1024) + 'M',"5 min", stats["min5"]],
["idle", cpu_times.idle,"steal", cpu_times.steal,"free", str(mem.free/1024/1024) + 'M',"cached", str(mem.cached/1024/1024) + 'M',"free", str(swap.free/1024/1024) + 'M',"15 min", stats["min15"]]
]
title = "Linux Overview"
if save_as == "txt":
f_print_title(title)
f_print_table_body(rows, style1,' ')
elif save_as == "html":
f_print_table_html(rows, title, style)
def check_disk(i_warning, i_critical, s_partition):
"""
Checks for disk stats
Gets:
i_warning: Warning Threshold
i_critical: Critical Threshold
s_partition: partition that should be used to trigger WARNING/CRITICAL
Returns:
check output including perfdata
"""
test_int(i_warning, i_critical)
test_string(s_partition)
s_perfdata = ''
s_output = ''
l_partitions = psutil.disk_partitions()
d_io_counters = psutil.disk_io_counters(perdisk=True)
f_monitored_partition_usage = 0.0
for nt_partition in l_partitions:
# get usage for every partition
d_disk_usage = psutil.disk_usage(nt_partition.mountpoint)._asdict()
# add all usage data to perfdata
for key, value in d_disk_usage.items():
s_perfdata = add_perfdata(s_perfdata, nt_partition.mountpoint, key, value)
# check monitored partition and add status to output
if nt_partition.mountpoint == s_partition:
s_output = check_status(i_warning, i_critical, d_disk_usage['percent'])
f_monitored_partition_usage = d_disk_usage['percent']
# add message if status is not OK
if not 'OK' in s_output:
s_output += ' {} has a usage of {} percent.'.format(s_partition, f_monitored_partition_usage)
# add all the mountpoints and other info to output
for nt_partition in l_partitions:
d_partition = nt_partition._asdict()
for key, value in d_partition.items():
if not key == 'device':
s_output += '\n{}.{}={}'.format(
d_partition['device'],
key,
value
)
for s_device, nt_partition in d_io_counters.items():
d_partition = nt_partition._asdict()
# add all io_counters to perfdata
for key, value in d_partition.items():
s_perfdata = add_perfdata(s_perfdata, s_device, key, value)
# put it all together
s_output += ' | {}'.format(s_perfdata)
return s_output
def poll(interval):
"""Calculate IO usage by comparing IO statics before and
after the interval.
Return a tuple including all currently running processes
sorted by IO activity and total disks I/O activity.
"""
# first get a list of all processes and disk io counters
procs = [p for p in psutil.process_iter()]
for p in procs[:]:
try:
p._before = p.io_counters()
except psutil.Error:
procs.remove(p)
continue
disks_before = psutil.disk_io_counters()
# sleep some time
time.sleep(interval)
# then retrieve the same info again
for p in procs[:]:
try:
p._after = p.io_counters()
p._cmdline = ' '.join(p.cmdline())
if not p._cmdline:
p._cmdline = p.name()
p._username = p.username()
except (psutil.NoSuchProcess, psutil.ZombieProcess):
procs.remove(p)
disks_after = psutil.disk_io_counters()
# finally calculate results by comparing data before and
# after the interval
for p in procs:
p._read_per_sec = p._after.read_bytes - p._before.read_bytes
p._write_per_sec = p._after.write_bytes - p._before.write_bytes
p._total = p._read_per_sec + p._write_per_sec
disks_read_per_sec = disks_after.read_bytes - disks_before.read_bytes
disks_write_per_sec = disks_after.write_bytes - disks_before.write_bytes
# sort processes by total disk IO so that the more intensive
# ones get listed first
processes = sorted(procs, key=lambda p: p._total, reverse=True)
return (processes, disks_read_per_sec, disks_write_per_sec)