def test_procfs_path(self):
tdir = tempfile.mkdtemp()
try:
psutil.PROCFS_PATH = tdir
self.assertRaises(IOError, psutil.virtual_memory)
self.assertRaises(IOError, psutil.cpu_times)
self.assertRaises(IOError, psutil.cpu_times, percpu=True)
self.assertRaises(IOError, psutil.boot_time)
# self.assertRaises(IOError, psutil.pids)
self.assertRaises(IOError, psutil.net_connections)
self.assertRaises(IOError, psutil.net_io_counters)
self.assertRaises(IOError, psutil.net_if_stats)
self.assertRaises(IOError, psutil.disk_io_counters)
self.assertRaises(IOError, psutil.disk_partitions)
self.assertRaises(psutil.NoSuchProcess, psutil.Process)
finally:
psutil.PROCFS_PATH = "/proc"
os.rmdir(tdir)
python类disk_io_counters()的实例源码
def monitor(frist_invoke=2):
"""
Return (inbytes, outbytes, in_num, out_num, ioms) of disk.
"""
sdiskio = psutil.disk_io_counters()
# sleep some time
value_dic = {
'iostats': {
'io.disks_read': sdiskio.read_bytes/(1024*1024),
'io.disks_write': sdiskio.write_bytes/(1024*1024),
'io.disks_read_count': sdiskio.read_count/(1024 * 1024),
'io.disks_write_count': sdiskio.write_count/(1024 * 1024),
'io.disks_read_time': sdiskio.read_time/1000,
'io.disks_write_time': sdiskio.write_time/1000,
'io.disks_busy_time': sdiskio.write_time/1000,
}
}
return value_dic
def test_procfs_path(self):
tdir = tempfile.mkdtemp()
try:
psutil.PROCFS_PATH = tdir
self.assertRaises(IOError, psutil.virtual_memory)
self.assertRaises(IOError, psutil.cpu_times)
self.assertRaises(IOError, psutil.cpu_times, percpu=True)
self.assertRaises(IOError, psutil.boot_time)
# self.assertRaises(IOError, psutil.pids)
self.assertRaises(IOError, psutil.net_connections)
self.assertRaises(IOError, psutil.net_io_counters)
self.assertRaises(IOError, psutil.net_if_stats)
self.assertRaises(IOError, psutil.disk_io_counters)
self.assertRaises(IOError, psutil.disk_partitions)
self.assertRaises(psutil.NoSuchProcess, psutil.Process)
finally:
psutil.PROCFS_PATH = "/proc"
os.rmdir(tdir)
def test_serialization(self):
def check(ret):
if json is not None:
json.loads(json.dumps(ret))
a = pickle.dumps(ret)
b = pickle.loads(a)
self.assertEqual(ret, b)
check(psutil.Process().as_dict())
check(psutil.virtual_memory())
check(psutil.swap_memory())
check(psutil.cpu_times())
check(psutil.cpu_times_percent(interval=0))
check(psutil.net_io_counters())
if LINUX and not os.path.exists('/proc/diskstats'):
pass
else:
if not APPVEYOR:
check(psutil.disk_io_counters())
check(psutil.disk_partitions())
check(psutil.disk_usage(os.getcwd()))
check(psutil.users())
def test_procfs_path(self):
tdir = tempfile.mkdtemp()
try:
psutil.PROCFS_PATH = tdir
self.assertRaises(IOError, psutil.virtual_memory)
self.assertRaises(IOError, psutil.cpu_times)
self.assertRaises(IOError, psutil.cpu_times, percpu=True)
self.assertRaises(IOError, psutil.boot_time)
# self.assertRaises(IOError, psutil.pids)
self.assertRaises(IOError, psutil.net_connections)
self.assertRaises(IOError, psutil.net_io_counters)
self.assertRaises(IOError, psutil.net_if_stats)
self.assertRaises(IOError, psutil.disk_io_counters)
self.assertRaises(IOError, psutil.disk_partitions)
self.assertRaises(psutil.NoSuchProcess, psutil.Process)
finally:
psutil.PROCFS_PATH = "/proc"
os.rmdir(tdir)
def test_serialization(self):
def check(ret):
if json is not None:
json.loads(json.dumps(ret))
a = pickle.dumps(ret)
b = pickle.loads(a)
self.assertEqual(ret, b)
check(psutil.Process().as_dict())
check(psutil.virtual_memory())
check(psutil.swap_memory())
check(psutil.cpu_times())
check(psutil.cpu_times_percent(interval=0))
check(psutil.net_io_counters())
if LINUX and not os.path.exists('/proc/diskstats'):
pass
else:
if not APPVEYOR:
check(psutil.disk_io_counters())
check(psutil.disk_partitions())
check(psutil.disk_usage(os.getcwd()))
check(psutil.users())
def get_disk_io_info(self):
returnData = {'readiokps': {}, 'writeiokps': {}}
try:
old_info = psutil.disk_io_counters(perdisk=True)
time.sleep(1)
new_info = psutil.disk_io_counters(perdisk=True)
for (diskname, rwinfo) in old_info.items():
oldr, oldw = rwinfo.read_bytes, rwinfo.write_bytes
newr, neww = new_info[diskname].read_bytes, new_info[
diskname].write_bytes
riok = (newr - oldr) / 1024.0
wiok = (neww - oldw) / 1024.0
returnData['readiokps'][diskname] = riok
returnData['writeiokps'][diskname] = wiok
except Exception:
pybixlib.error(self.logHead + traceback.format_exc())
self.errorInfoDone(traceback.format_exc())
return returnData
def get_metrics(self):
self.initial_io_stats = psutil.disk_io_counters(perdisk=True)
curr_host_name = socket.gethostbyname(
self.CONFIG['peer_name']
)
time.sleep(self.STAT_INTERVAL_FOR_PER_SEC_COUNTER)
self.current_io_stats = psutil.disk_io_counters(perdisk=True)
threads = []
for volume in self.CLUSTER_TOPOLOGY.get('volumes', []):
for sub_volume_index, sub_volume_bricks in volume.get(
'bricks',
[]
).iteritems():
for brick in sub_volume_bricks:
brick_hostname = brick['hostname']
if (
brick_hostname == curr_host_name or
brick_hostname == self.CONFIG['peer_name']
):
thread = threading.Thread(
target=self.populate_disk_details,
args=(
volume['name'],
brick['hostname'],
brick['path'],
)
)
thread.start()
threads.append(
thread
)
for thread in threads:
thread.join(1)
for thread in threads:
del thread
return self.brick_details
def disk_read():
return bytes2human(psutil.disk_io_counters().read_bytes)
def disk_written():
return bytes2human(psutil.disk_io_counters().write_bytes)
def get_stats(self):
cpct = psutil.cpu_percent(interval=0)
ctimes = psutil.cpu_times_percent()
self.cpu_stats = CpuStats(cpct, ctimes.user, ctimes.system,
ctimes.idle)
self.vmem_stats = psutil.virtual_memory()
self.disk_stats = psutil.disk_io_counters()
self.net_stats = psutil.net_io_counters()
# must create new stats list each time stats are updated
# because named tuples are immutable
self.statslist = [self.cpu_stats, self.vmem_stats, self.disk_stats,
self.net_stats]
def test_disk_io_counters(self):
self.execute(psutil.disk_io_counters)
# --- net
def test_disk_io_counters_kernel_2_4_mocked(self):
# Tests /proc/diskstats parsing format for 2.4 kernels, see:
# https://github.com/giampaolo/psutil/issues/767
def open_mock(name, *args, **kwargs):
if name == '/proc/partitions':
return io.StringIO(textwrap.dedent(u"""\
major minor #blocks name
8 0 488386584 hda
"""))
elif name == '/proc/diskstats':
return io.StringIO(
u(" 3 0 1 hda 2 3 4 5 6 7 8 9 10 11 12"))
else:
return orig_open(name, *args, **kwargs)
orig_open = open
patch_point = 'builtins.open' if PY3 else '__builtin__.open'
with mock.patch(patch_point, side_effect=open_mock) as m:
ret = psutil.disk_io_counters()
assert m.called
self.assertEqual(ret.read_count, 1)
self.assertEqual(ret.read_merged_count, 2)
self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
self.assertEqual(ret.read_time, 4)
self.assertEqual(ret.write_count, 5)
self.assertEqual(ret.write_merged_count, 6)
self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
self.assertEqual(ret.write_time, 8)
self.assertEqual(ret.busy_time, 10)
def test_disk_io_counters_kernel_2_6_full_mocked(self):
# Tests /proc/diskstats parsing format for 2.6 kernels,
# lines reporting all metrics:
# https://github.com/giampaolo/psutil/issues/767
def open_mock(name, *args, **kwargs):
if name == '/proc/partitions':
return io.StringIO(textwrap.dedent(u"""\
major minor #blocks name
8 0 488386584 hda
"""))
elif name == '/proc/diskstats':
return io.StringIO(
u(" 3 0 hda 1 2 3 4 5 6 7 8 9 10 11"))
else:
return orig_open(name, *args, **kwargs)
orig_open = open
patch_point = 'builtins.open' if PY3 else '__builtin__.open'
with mock.patch(patch_point, side_effect=open_mock) as m:
ret = psutil.disk_io_counters()
assert m.called
self.assertEqual(ret.read_count, 1)
self.assertEqual(ret.read_merged_count, 2)
self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
self.assertEqual(ret.read_time, 4)
self.assertEqual(ret.write_count, 5)
self.assertEqual(ret.write_merged_count, 6)
self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
self.assertEqual(ret.write_time, 8)
self.assertEqual(ret.busy_time, 10)
def test_disk_io_counters_kernel_2_6_limited_mocked(self):
# Tests /proc/diskstats parsing format for 2.6 kernels,
# where one line of /proc/partitions return a limited
# amount of metrics when it bumps into a partition
# (instead of a disk). See:
# https://github.com/giampaolo/psutil/issues/767
def open_mock(name, *args, **kwargs):
if name == '/proc/partitions':
return io.StringIO(textwrap.dedent(u"""\
major minor #blocks name
8 0 488386584 hda
"""))
elif name == '/proc/diskstats':
return io.StringIO(
u(" 3 1 hda 1 2 3 4"))
else:
return orig_open(name, *args, **kwargs)
orig_open = open
patch_point = 'builtins.open' if PY3 else '__builtin__.open'
with mock.patch(patch_point, side_effect=open_mock) as m:
ret = psutil.disk_io_counters()
assert m.called
self.assertEqual(ret.read_count, 1)
self.assertEqual(ret.read_bytes, 2 * SECTOR_SIZE)
self.assertEqual(ret.write_count, 3)
self.assertEqual(ret.write_bytes, 4 * SECTOR_SIZE)
self.assertEqual(ret.read_merged_count, 0)
self.assertEqual(ret.read_time, 0)
self.assertEqual(ret.write_merged_count, 0)
self.assertEqual(ret.write_time, 0)
self.assertEqual(ret.busy_time, 0)
# =====================================================================
# misc
# =====================================================================
def test_disk_io_counters(self):
def check_ntuple(nt):
self.assertEqual(nt[0], nt.read_count)
self.assertEqual(nt[1], nt.write_count)
self.assertEqual(nt[2], nt.read_bytes)
self.assertEqual(nt[3], nt.write_bytes)
if not (OPENBSD or NETBSD):
self.assertEqual(nt[4], nt.read_time)
self.assertEqual(nt[5], nt.write_time)
if LINUX:
self.assertEqual(nt[6], nt.read_merged_count)
self.assertEqual(nt[7], nt.write_merged_count)
self.assertEqual(nt[8], nt.busy_time)
elif FREEBSD:
self.assertEqual(nt[6], nt.busy_time)
for name in nt._fields:
assert getattr(nt, name) >= 0, nt
ret = psutil.disk_io_counters(perdisk=False)
check_ntuple(ret)
ret = psutil.disk_io_counters(perdisk=True)
# make sure there are no duplicates
self.assertEqual(len(ret), len(set(ret)))
for key in ret:
assert key, key
check_ntuple(ret[key])
if LINUX and key[-1].isdigit():
# if 'sda1' is listed 'sda' shouldn't, see:
# https://github.com/giampaolo/psutil/issues/338
while key[-1].isdigit():
key = key[:-1]
self.assertNotIn(key, ret.keys())
def host_disk_usage_io_performance_report(self):
data = list()
disk_io_counters = psutil.disk_io_counters(perdisk=True)
for mountpoint, disk in self.disks.items():
dev = os.path.basename(disk['real_device'])
disk_usage_io = list()
if dev in self.last_host_disk_io:
disk_usage_io = {
'node_id': self.node_id,
'mountpoint': mountpoint,
'used': psutil.disk_usage(mountpoint).used,
'rd_req':
(disk_io_counters[dev].read_count - self.last_host_disk_io[dev].read_count) / self.interval,
'rd_bytes':
(disk_io_counters[dev].read_bytes - self.last_host_disk_io[dev].read_bytes) / self.interval,
'wr_req':
(disk_io_counters[dev].write_count - self.last_host_disk_io[dev].write_count) / self.interval,
'wr_bytes':
(disk_io_counters[dev].write_bytes - self.last_host_disk_io[dev].write_bytes) / self.interval
}
elif not isinstance(self.last_host_disk_io, dict):
self.last_host_disk_io = dict()
self.last_host_disk_io[dev] = disk_io_counters[dev]
if disk_usage_io.__len__() > 0:
data.append(disk_usage_io)
if data.__len__() > 0:
host_collection_performance_emit.disk_usage_io(data=data)
def update(self):
a=0
tot=0
self.load.append(cpu_percent())
for i in range(len(self.load)-1,len(self.load)):
a=a+self.load[i]
tot=tot+1.0
a=a/tot
self.load[len(self.load)-1]=a
self.load.pop(0)
try: #user reported bug, This is a problem with the underlying function.
w_temp=disk_io_counters()[3]/1000
except:
w_temp=0
w_delta=w_temp-self.wait_last
self.wait_last=w_temp
self.wait.append(int(w_delta))
#print(w_delta)
self.wait.pop(0)
self.color.append([255,0,0])
self.color.pop(0)
self.repaint()
def run(self, *unused):
if(os.path.isfile("/proc/diskstats")):
return diskstats_parse()
else:
results = {}
try:
diskdata = psutil.disk_io_counters(perdisk=True)
for device, values in diskdata.items():
device_stats = {}
for key_value in values._fields:
device_stats[key_value] = getattr(values, key_value)
results[device] = device_stats
except Exception as e:
results = e.message
return results
def getDiskReadWrite():
diskIO = psutil.disk_io_counters()
return( [diskIO.read_time, diskIO.write_time] )
def getDiskReadWrite():
diskIO = psutil.disk_io_counters()
return( [diskIO.read_time, diskIO.write_time] )
def getMethods(self):
methods = [
"disk_usage",
"disk_io_counters"
]
return ServiceBase.getMethods() + methods
def disk_io_counters(self, perfdisk):
return psutil.disk_io_counters(perfdisk)
def check(self):
# ???????????????????????????
# if platform_util.is_linux():
# data_per_disk, count = self.get_linux_iostat()
# else:
# ?????????????iostat??????????????Alren 2016-03-11
data_per_disk, count = self.get_other_iostat()
if count: # ???LXC????????????disk_io_counters
data = {k: v/count for k,v in data_per_disk.iteritems() if k != 'io.util'}
data['io.util'] = data_per_disk['io.util']
else:
data = data_per_disk
return data
def get_other_iostat(self):
curr_stat = psutil.disk_io_counters(True)
curr_cpu_time = self.sum_cpu_time(psutil.cpu_times()) / self.cpu_count
if self.last_cpu_time == 0: #???
self.last_stat = curr_stat
self.last_cpu_time = curr_cpu_time
return {}, 0
data_per_disk = {k: 0 for k in self.metric_define}
count = 0
ts = curr_cpu_time - self.last_cpu_time
for disk, nval in curr_stat.iteritems():
oval = self.last_stat.get(disk)# ?????
if not oval:
continue
total_time = nval.write_time - oval.write_time + nval.read_time - oval.read_time
total_count = nval.write_count - oval.write_count + nval.read_count - oval.read_count
if not total_count: # ?????IO????????
continue
data_per_disk['io.w_s'] += (nval.write_count - oval.write_count) / ts
data_per_disk['io.wkbyte_s'] += (nval.write_bytes - oval.write_bytes) / 1024 / ts
data_per_disk['io.r_s'] += (nval.read_count - oval.read_count) / ts
data_per_disk['io.rkbyte_s'] += (nval.read_bytes - oval.read_bytes) / 1024 / ts
data_per_disk['io.await'] += total_time / total_count if total_count else 0.0
if hasattr(oval, 'busy_time'):# linux?psutil==4.0.0??busy_time
data_per_disk['io.svctm'] += (nval.busy_time - oval.busy_time) / total_count if total_count else 0.0
io_util = (nval.busy_time - oval.busy_time) * 100.0 / (ts*1000)
if io_util > data_per_disk['io.util']:# ?????
data_per_disk['io.util'] = io_util if io_util < 100 else 100
data_per_disk['io.queue_time_percent'] = (data_per_disk['io.await'] - data_per_disk['io.svctm']) * 100 / data_per_disk['io.await'] if data_per_disk['io.await'] else 0
count += 1
self.last_stat = curr_stat
self.last_cpu_time = curr_cpu_time
return data_per_disk, count
def test_disk_io_counters(self):
self.execute(psutil.disk_io_counters)
# --- proc
def test_disk_io_counters_kernel_2_4_mocked(self):
# Tests /proc/diskstats parsing format for 2.4 kernels, see:
# https://github.com/giampaolo/psutil/issues/767
def open_mock(name, *args, **kwargs):
if name == '/proc/partitions':
return io.StringIO(textwrap.dedent(u"""\
major minor #blocks name
8 0 488386584 hda
"""))
elif name == '/proc/diskstats':
return io.StringIO(
u(" 3 0 1 hda 2 3 4 5 6 7 8 9 10 11 12"))
else:
return orig_open(name, *args, **kwargs)
orig_open = open
patch_point = 'builtins.open' if PY3 else '__builtin__.open'
with mock.patch(patch_point, side_effect=open_mock) as m:
ret = psutil.disk_io_counters()
assert m.called
self.assertEqual(ret.read_count, 1)
self.assertEqual(ret.read_merged_count, 2)
self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
self.assertEqual(ret.read_time, 4)
self.assertEqual(ret.write_count, 5)
self.assertEqual(ret.write_merged_count, 6)
self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
self.assertEqual(ret.write_time, 8)
self.assertEqual(ret.busy_time, 10)
def test_disk_io_counters_kernel_2_6_full_mocked(self):
# Tests /proc/diskstats parsing format for 2.6 kernels,
# lines reporting all metrics:
# https://github.com/giampaolo/psutil/issues/767
def open_mock(name, *args, **kwargs):
if name == '/proc/partitions':
return io.StringIO(textwrap.dedent(u"""\
major minor #blocks name
8 0 488386584 hda
"""))
elif name == '/proc/diskstats':
return io.StringIO(
u(" 3 0 hda 1 2 3 4 5 6 7 8 9 10 11"))
else:
return orig_open(name, *args, **kwargs)
orig_open = open
patch_point = 'builtins.open' if PY3 else '__builtin__.open'
with mock.patch(patch_point, side_effect=open_mock) as m:
ret = psutil.disk_io_counters()
assert m.called
self.assertEqual(ret.read_count, 1)
self.assertEqual(ret.read_merged_count, 2)
self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
self.assertEqual(ret.read_time, 4)
self.assertEqual(ret.write_count, 5)
self.assertEqual(ret.write_merged_count, 6)
self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
self.assertEqual(ret.write_time, 8)
self.assertEqual(ret.busy_time, 10)
def test_disk_io_counters_kernel_2_6_limited_mocked(self):
# Tests /proc/diskstats parsing format for 2.6 kernels,
# where one line of /proc/partitions return a limited
# amount of metrics when it bumps into a partition
# (instead of a disk). See:
# https://github.com/giampaolo/psutil/issues/767
def open_mock(name, *args, **kwargs):
if name == '/proc/partitions':
return io.StringIO(textwrap.dedent(u"""\
major minor #blocks name
8 0 488386584 hda
"""))
elif name == '/proc/diskstats':
return io.StringIO(
u(" 3 1 hda 1 2 3 4"))
else:
return orig_open(name, *args, **kwargs)
orig_open = open
patch_point = 'builtins.open' if PY3 else '__builtin__.open'
with mock.patch(patch_point, side_effect=open_mock) as m:
ret = psutil.disk_io_counters()
assert m.called
self.assertEqual(ret.read_count, 1)
self.assertEqual(ret.read_bytes, 2 * SECTOR_SIZE)
self.assertEqual(ret.write_count, 3)
self.assertEqual(ret.write_bytes, 4 * SECTOR_SIZE)
self.assertEqual(ret.read_merged_count, 0)
self.assertEqual(ret.read_time, 0)
self.assertEqual(ret.write_merged_count, 0)
self.assertEqual(ret.write_time, 0)
self.assertEqual(ret.busy_time, 0)
# =====================================================================
# misc
# =====================================================================
def test_disk_io_counters(self):
def check_ntuple(nt):
self.assertEqual(nt[0], nt.read_count)
self.assertEqual(nt[1], nt.write_count)
self.assertEqual(nt[2], nt.read_bytes)
self.assertEqual(nt[3], nt.write_bytes)
if not (OPENBSD or NETBSD):
self.assertEqual(nt[4], nt.read_time)
self.assertEqual(nt[5], nt.write_time)
if LINUX:
self.assertEqual(nt[6], nt.read_merged_count)
self.assertEqual(nt[7], nt.write_merged_count)
self.assertEqual(nt[8], nt.busy_time)
elif FREEBSD:
self.assertEqual(nt[6], nt.busy_time)
for name in nt._fields:
assert getattr(nt, name) >= 0, nt
ret = psutil.disk_io_counters(perdisk=False)
check_ntuple(ret)
ret = psutil.disk_io_counters(perdisk=True)
# make sure there are no duplicates
self.assertEqual(len(ret), len(set(ret)))
for key in ret:
assert key, key
check_ntuple(ret[key])
if LINUX and key[-1].isdigit():
# if 'sda1' is listed 'sda' shouldn't, see:
# https://github.com/giampaolo/psutil/issues/338
while key[-1].isdigit():
key = key[:-1]
self.assertNotIn(key, ret.keys())
# can't find users on APPVEYOR or TRAVIS