def test_disks(self):
# test psutil.disk_usage() and psutil.disk_partitions()
# against "df -a"
def df(path):
out = sh('df -k "%s"' % path).strip()
lines = out.split('\n')
lines.pop(0)
line = lines.pop(0)
dev, total, used, free = line.split()[:4]
if dev == 'none':
dev = ''
total = int(total) * 1024
used = int(used) * 1024
free = int(free) * 1024
return dev, total, used, free
for part in psutil.disk_partitions(all=False):
usage = psutil.disk_usage(part.mountpoint)
dev, total, used, free = df(part.mountpoint)
self.assertEqual(part.device, dev)
self.assertEqual(usage.total, total)
# 10 MB tollerance
if abs(usage.free - free) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % (usage.free, free))
if abs(usage.used - used) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % (usage.used, used))
python类disk_partitions()的实例源码
def update_disks(self):
self.disks.clear()
for disk in psutil.disk_partitions(all=False):
disk_usage = psutil.disk_usage(disk.mountpoint)
self.disks[disk.mountpoint] = {'device': disk.device, 'real_device': disk.device, 'fstype': disk.fstype,
'opts': disk.opts, 'total': disk_usage.total, 'used': disk_usage.used,
'free': disk_usage.free, 'percent': disk_usage.percent}
if os.path.islink(disk.device):
self.disks[disk.mountpoint]['real_device'] = os.path.realpath(disk.device)
# ?????????????? ??? ???
def partitions():
return psutil.disk_partitions()
def getLocation():
context = pyudev.Context()
removable = [device for device in context.list_devices(subsystem='block', DEVTYPE='disk') if device.attributes.asstring('removable') == "1"]
for device in removable:
partitions = [device.device_node for device in context.list_devices(subsystem='block', DEVTYPE='partition', parent=device)]
for p in psutil.disk_partitions():
if p.device in partitions:
mntpoint = p.mountpoint
print(p.device + ' => ' + mntpoint)
return mntpoint
def run(self, config):
disk = {}
disk['df-psutil'] = []
for part in psutil.disk_partitions(False):
if os.name == 'nt':
if 'cdrom' in part.opts or part.fstype == '':
# skip cd-rom drives with no disk in it; they may raise
# ENOENT, pop-up a Windows GUI error for a non-ready
# partition or just hang.
continue
try:
usage = psutil.disk_usage(part.mountpoint)
diskdata = {}
diskdata['info'] = part
for key in usage._fields:
diskdata[key] = getattr(usage, key)
disk['df-psutil'].append(diskdata)
except:
pass
try:
force_df = config.get('diskusage', 'force_df')
except:
force_df = 'no'
if len(disk['df-psutil']) == 0 or force_df == 'yes':
try:
disk['df-psutil'] = []
df_output_lines = [s.split() for s in os.popen("df -Pl").read().splitlines()]
del df_output_lines[0]
for row in df_output_lines:
if row[0] == 'tmpfs':
continue
disk['df-psutil'].append({'info': [row[0], row[5],'',''], 'total': int(row[1])*1024, 'used': int(row[2])*1024, 'free': int(row[3])*1024, 'percent': row[4][:-1]})
except:
pass
return disk
def getAttrRO(self):
attrs = [
'disk_partitions'
]
return ServiceBase.getAttrRO() + attrs
def get_disk_partitions(self):
return psutil.disk_partitions()
def monitor(frist_invoke=1):
value_dic = {
'disk': {}
}
for part in psutil.disk_partitions(all=False):
if os.name == 'nt':
if 'cdrom' in part.opts or part.fstype == '':
# skip cd-rom drives with no disk in it; they may raise
# ENOENT, pop-up a Windows GUI error for a non-ready
# partition or just hang.
continue
mount_point = part.mountpoint
device = part.device
usage = psutil.disk_usage(part.mountpoint)
value_dic['disk'][mount_point] = {
'disk.device':device,
'disk.mount_point':mount_point,
'disk.total':usage.total/(1024*1024),
'disk.used':usage.used/(1024*1024),
'disk.free':usage.free/(1024*1024),
'disk.percent': usage.percent,
'disk.type':part.fstype,
}
return value_dic
def check(self):
data = {
'disk.max_used_percent': 0,
'disk.max_used_percent_mount': '',
'disk.max_used': 0,
'disk.max_used_mount': '',
'disk.min_free': 0,
'disk.min_free_mount': '',
'disk.used_percent': 0,
'disk.used': 0,
'disk.free': 0,
'disk.total': 0,
}
partitions = psutil.disk_partitions(all=True)
for partition in partitions:
if partition.opts.upper() in ('CDROM', 'REMOVABLE'):
continue
mountpoint = partition.mountpoint
usage = psutil.disk_usage(mountpoint)
if usage.percent > data['disk.max_used_percent']:
data['disk.max_used_percent'] = usage.percent
data['disk.max_used_percent_mount'] = mountpoint
if usage.used > data['disk.max_used']:
data['disk.max_used'] = usage.used
data['disk.max_used_mount'] = mountpoint
if usage.free < data['disk.min_free'] or data['disk.min_free'] == 0:
data['disk.min_free'] = usage.free
data['disk.min_free_mount'] = mountpoint
data['disk.used'] += usage.used
data['disk.free'] += usage.free
data['disk.total'] += usage.total
data['disk.used_percent'] = data['disk.used']*100 / data['disk.total'] if data['disk.total'] else 0
data['disk.max_used'] = data['disk.max_used'] / 1024
data['disk.min_free'] = data['disk.min_free'] / 1024
data['disk.used'] = data['disk.used'] / 1024
data['disk.free'] = data['disk.free'] / 1024
data['disk.total'] = data['disk.total'] / 1024
return data
def test_disk_partitions(self):
self.execute(psutil.disk_partitions)
def test_disks(self):
# test psutil.disk_usage() and psutil.disk_partitions()
# against "df -a"
def df(path):
out = sh('df -k "%s"' % path).strip()
lines = out.split('\n')
lines.pop(0)
line = lines.pop(0)
dev, total, used, free = line.split()[:4]
if dev == 'none':
dev = ''
total = int(total) * 1024
used = int(used) * 1024
free = int(free) * 1024
return dev, total, used, free
for part in psutil.disk_partitions(all=False):
usage = psutil.disk_usage(part.mountpoint)
dev, total, used, free = df(part.mountpoint)
self.assertEqual(part.device, dev)
self.assertEqual(usage.total, total)
# 10 MB tollerance
if abs(usage.free - free) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % usage.free, free)
if abs(usage.used - used) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % usage.used, used)
# --- cpu
def test_disks(self):
ps_parts = psutil.disk_partitions(all=True)
wmi_parts = wmi.WMI().Win32_LogicalDisk()
for ps_part in ps_parts:
for wmi_part in wmi_parts:
if ps_part.device.replace('\\', '') == wmi_part.DeviceID:
if not ps_part.mountpoint:
# this is usually a CD-ROM with no disk inserted
break
try:
usage = psutil.disk_usage(ps_part.mountpoint)
except OSError as err:
if err.errno == errno.ENOENT:
# usually this is the floppy
break
else:
raise
self.assertEqual(usage.total, int(wmi_part.Size))
wmi_free = int(wmi_part.FreeSpace)
self.assertEqual(usage.free, wmi_free)
# 10 MB tollerance
if abs(usage.free - wmi_free) > 10 * 1024 * 1024:
self.fail("psutil=%s, wmi=%s" % (
usage.free, wmi_free))
break
else:
self.fail("can't find partition %s" % repr(ps_part))
def test_disk_usage(self):
def df(device):
out = sh("df -k %s" % device).strip()
line = out.split('\n')[1]
fields = line.split()
total = int(fields[1]) * 1024
used = int(fields[2]) * 1024
free = int(fields[3]) * 1024
percent = float(fields[4].replace('%', ''))
return (total, used, free, percent)
tolerance = 4 * 1024 * 1024 # 4MB
for part in psutil.disk_partitions(all=False):
usage = psutil.disk_usage(part.mountpoint)
try:
total, used, free, percent = df(part.device)
except RuntimeError as err:
# see:
# https://travis-ci.org/giampaolo/psutil/jobs/138338464
# https://travis-ci.org/giampaolo/psutil/jobs/138343361
if "no such file or directory" in str(err).lower() or \
"raw devices not supported" in str(err).lower():
continue
else:
raise
else:
self.assertAlmostEqual(usage.total, total, delta=tolerance)
self.assertAlmostEqual(usage.used, used, delta=tolerance)
self.assertAlmostEqual(usage.free, free, delta=tolerance)
self.assertAlmostEqual(usage.percent, percent, delta=1)
def test_disks(self):
# test psutil.disk_usage() and psutil.disk_partitions()
# against "df -a"
def df(path):
out = sh('df -k "%s"' % path).strip()
lines = out.split('\n')
lines.pop(0)
line = lines.pop(0)
dev, total, used, free = line.split()[:4]
if dev == 'none':
dev = ''
total = int(total) * 1024
used = int(used) * 1024
free = int(free) * 1024
return dev, total, used, free
for part in psutil.disk_partitions(all=False):
usage = psutil.disk_usage(part.mountpoint)
dev, total, used, free = df(part.mountpoint)
self.assertEqual(part.device, dev)
self.assertEqual(usage.total, total)
# 10 MB tollerance
if abs(usage.free - free) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % (usage.free, free))
if abs(usage.used - used) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % (usage.used, used))
def test_disk_partitions(self):
self.execute(psutil.disk_partitions)
def test_disks(self):
# test psutil.disk_usage() and psutil.disk_partitions()
# against "df -a"
def df(path):
out = sh('df -k "%s"' % path).strip()
lines = out.split('\n')
lines.pop(0)
line = lines.pop(0)
dev, total, used, free = line.split()[:4]
if dev == 'none':
dev = ''
total = int(total) * 1024
used = int(used) * 1024
free = int(free) * 1024
return dev, total, used, free
for part in psutil.disk_partitions(all=False):
usage = psutil.disk_usage(part.mountpoint)
dev, total, used, free = df(part.mountpoint)
self.assertEqual(part.device, dev)
self.assertEqual(usage.total, total)
# 10 MB tollerance
if abs(usage.free - free) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % usage.free, free)
if abs(usage.used - used) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % usage.used, used)
# --- cpu
def test_disks(self):
ps_parts = psutil.disk_partitions(all=True)
wmi_parts = wmi.WMI().Win32_LogicalDisk()
for ps_part in ps_parts:
for wmi_part in wmi_parts:
if ps_part.device.replace('\\', '') == wmi_part.DeviceID:
if not ps_part.mountpoint:
# this is usually a CD-ROM with no disk inserted
break
try:
usage = psutil.disk_usage(ps_part.mountpoint)
except OSError as err:
if err.errno == errno.ENOENT:
# usually this is the floppy
break
else:
raise
self.assertEqual(usage.total, int(wmi_part.Size))
wmi_free = int(wmi_part.FreeSpace)
self.assertEqual(usage.free, wmi_free)
# 10 MB tollerance
if abs(usage.free - wmi_free) > 10 * 1024 * 1024:
self.fail("psutil=%s, wmi=%s" % (
usage.free, wmi_free))
break
else:
self.fail("can't find partition %s" % repr(ps_part))
def test_disk_usage(self):
for disk in psutil.disk_partitions():
sys_value = win32api.GetDiskFreeSpaceEx(disk.mountpoint)
psutil_value = psutil.disk_usage(disk.mountpoint)
self.assertAlmostEqual(sys_value[0], psutil_value.free,
delta=1024 * 1024)
self.assertAlmostEqual(sys_value[1], psutil_value.total,
delta=1024 * 1024)
self.assertEqual(psutil_value.used,
psutil_value.total - psutil_value.free)
def test_disk_partitions(self):
sys_value = [
x + '\\' for x in win32api.GetLogicalDriveStrings().split("\\\x00")
if x and not x.startswith('A:')]
psutil_value = [x.mountpoint for x in psutil.disk_partitions(all=True)]
self.assertEqual(sys_value, psutil_value)
def test_disk_usage(self):
def df(device):
out = sh("df -k %s" % device).strip()
line = out.split('\n')[1]
fields = line.split()
total = int(fields[1]) * 1024
used = int(fields[2]) * 1024
free = int(fields[3]) * 1024
percent = float(fields[4].replace('%', ''))
return (total, used, free, percent)
tolerance = 4 * 1024 * 1024 # 4MB
for part in psutil.disk_partitions(all=False):
usage = psutil.disk_usage(part.mountpoint)
try:
total, used, free, percent = df(part.device)
except RuntimeError as err:
# see:
# https://travis-ci.org/giampaolo/psutil/jobs/138338464
# https://travis-ci.org/giampaolo/psutil/jobs/138343361
if "no such file or directory" in str(err).lower() or \
"raw devices not supported" in str(err).lower():
continue
else:
raise
else:
self.assertAlmostEqual(usage.total, total, delta=tolerance)
self.assertAlmostEqual(usage.used, used, delta=tolerance)
self.assertAlmostEqual(usage.free, free, delta=tolerance)
self.assertAlmostEqual(usage.percent, percent, delta=1)