def test_disk_partitions_and_usage(self):
# test psutil.disk_usage() and psutil.disk_partitions()
# against "df -a"
def df(path):
out = sh('df -P -B 1 "%s"' % path).strip()
lines = out.split('\n')
lines.pop(0)
line = lines.pop(0)
dev, total, used, free = line.split()[:4]
if dev == 'none':
dev = ''
total, used, free = int(total), int(used), int(free)
return dev, total, used, free
for part in psutil.disk_partitions(all=False):
usage = psutil.disk_usage(part.mountpoint)
dev, total, used, free = df(part.mountpoint)
self.assertEqual(usage.total, total)
# 10 MB tollerance
if abs(usage.free - free) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % (usage.free, free))
if abs(usage.used - used) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % (usage.used, used))
python类disk_partitions()的实例源码
def test_procfs_path(self):
tdir = tempfile.mkdtemp()
try:
psutil.PROCFS_PATH = tdir
self.assertRaises(IOError, psutil.virtual_memory)
self.assertRaises(IOError, psutil.cpu_times)
self.assertRaises(IOError, psutil.cpu_times, percpu=True)
self.assertRaises(IOError, psutil.boot_time)
# self.assertRaises(IOError, psutil.pids)
self.assertRaises(IOError, psutil.net_connections)
self.assertRaises(IOError, psutil.net_io_counters)
self.assertRaises(IOError, psutil.net_if_stats)
self.assertRaises(IOError, psutil.disk_io_counters)
self.assertRaises(IOError, psutil.disk_partitions)
self.assertRaises(psutil.NoSuchProcess, psutil.Process)
finally:
psutil.PROCFS_PATH = "/proc"
os.rmdir(tdir)
def btrfs_get_mountpoints_of_subvol(subvol='', **kwargs):
"""
Determine the list of mountpoints for a given subvol (of the form @/foo/bar).
Returns a list of mountpoint(s), or an empty list.
"""
mountpoints = []
if not subvol:
return []
# Seems the easiest way to do this is to walk the disk partitions, extract the opts
# string and see if subvol is present. Remember the leading '/'.
for part in psutil.disk_partitions():
if "subvol=/{}".format(subvol) in part.opts:
mountpoints.append(part.mountpoint)
return mountpoints
# pylint: disable=unused-argument
def get_mountpoint_opts(mountpoint='', **kwargs):
"""
Determine the mount options set for a given mountpoint.
Returns a list of mount opts or None on error. For opts in the form 'key=val',
convert the opt into dictionary. Thus, our return structure may look
something like:
[ 'rw', 'relatime', ..., { 'subvolid': '259' }, ... ]'
"""
opts = None
for part in psutil.disk_partitions():
if part.mountpoint == mountpoint:
opts = part.opts.split(',')
# Convert foo=bar to dictionary entries if opts is not None or not an empty list.
opts = [o if '=' not in o else {k: v for (k, v) in [tuple(o.split('='))]}
for o in opts] if opts else None
if not opts:
log.error("Failed to determine mount opts for '{}'.".format(mountpoint))
return opts
def get_file_systems(self):
systems = psutil.disk_partitions()
for i in range(0, len(systems)):
system = systems[i]
system_options = {}
for option in system.opts.split(','):
option_local = prism.helpers.locale_('system', 'mount.options.' + option)
if option != option_local:
system_options[option] = option_local
else:
system_options[option] = prism.helpers.locale_('system', 'mount.options.unknown')
systems[i] = {'device': system.device, 'mount_point': system.mountpoint,
'fs_type': system.fstype, 'options': system_options,
'usage': psutil.disk_usage(system.mountpoint)}
return systems
def disksinfo(self):
values = []
disk_partitions = psutil.disk_partitions(all=False)
for partition in disk_partitions:
usage = psutil.disk_usage(partition.mountpoint)
device = {'device': partition.device,
'mountpoint': partition.mountpoint,
'fstype': partition.fstype,
'opts': partition.opts,
'total': usage.total,
'used': usage.used,
'free': usage.free,
'percent': usage.percent
}
values.append(device)
values = sorted(values, key=lambda device: device['device'])
return values
def add_device(request, device_path, bucket=None):
ceph.utils.osdize(dev, hookenv.config('osd-format'),
ceph_hooks.get_journal_devices(),
hookenv.config('osd-reformat'),
hookenv.config('ignore-device-errors'),
hookenv.config('osd-encrypt'),
hookenv.config('bluestore'))
# Make it fast!
if hookenv.config('autotune'):
ceph.utils.tune_dev(dev)
mounts = filter(lambda disk: device_path
in disk.device, psutil.disk_partitions())
if mounts:
osd = mounts[0]
osd_id = osd.mountpoint.split('/')[-1].split('-')[-1]
request.ops.append({
'op': 'move-osd-to-bucket',
'osd': "osd.{}".format(osd_id),
'bucket': bucket})
return request
def disk(self):
"""
Per system partition disk usage on system
Converting byte to human readable format
Return dict
"""
disk_info = {}
parts = self.disk_partitions
for i in parts:
part = i.mountpoint
part_usage = self.partition(part)
total = self.hr(part_usage.total)
used = self.hr(part_usage.used)
free = self.hr(part_usage.free)
percent = part_usage.percent
disk_info[part] = {
"total": total,
"used": used,
"free": free,
"percent": percent
}
return disk_info
def test_disk_partitions_mocked(self):
# Test that ZFS partitions are returned.
with open("/proc/filesystems", "r") as f:
data = f.read()
if 'zfs' in data:
for part in psutil.disk_partitions():
if part.fstype == 'zfs':
break
else:
self.fail("couldn't find any ZFS partition")
else:
# No ZFS partitions on this system. Let's fake one.
fake_file = io.StringIO(u("nodev\tzfs\n"))
with mock.patch('psutil._pslinux.open',
return_value=fake_file, create=True) as m1:
with mock.patch(
'psutil._pslinux.cext.disk_partitions',
return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2:
ret = psutil.disk_partitions()
assert m1.called
assert m2.called
assert ret
self.assertEqual(ret[0].fstype, 'zfs')
def add_device(request, device_path, bucket=None):
ceph.osdize(dev, config('osd-format'),
get_journal_devices(), config('osd-reformat'),
config('ignore-device-errors'),
config('osd-encrypt'),
config('bluestore'))
# Make it fast!
if config('autotune'):
ceph.tune_dev(dev)
mounts = filter(lambda disk: device_path
in disk.device, psutil.disk_partitions())
if mounts:
osd = mounts[0]
osd_id = osd.mountpoint.split('/')[-1].split('-')[-1]
request.ops.append({
'op': 'move-osd-to-bucket',
'osd': "osd.{}".format(osd_id),
'bucket': bucket})
return request
def handle_drive_intent(self, message):
partitions = psutil.disk_partitions()
for partition in partitions:
print("partition.mountpoint: %s" % partition.mountpoint)
if partition.mountpoint.startswith("/snap/"):
continue
partition_data = psutil.disk_usage(partition.mountpoint)
# total=21378641920, used=4809781248, free=15482871808,
# percent=22.5
data = {
"mountpoint": partition.mountpoint,
"total": sizeof_fmt(partition_data.total),
"used": sizeof_fmt(partition_data.used),
"free": sizeof_fmt(partition_data.free),
"percent": partition_data.percent
}
if partition_data.percent >= 90:
self.speak_dialog("drive.low", data)
else:
self.speak_dialog("drive", data)
def get_disk_info(self):
data = []
try:
disks = psutil.disk_partitions(all=True)
for disk in disks:
if not disk.device:
continue
if disk.opts.upper() in ('CDROM', 'REMOVABLE'):
continue
item = {}
item['name'] = disk.device
item['device'] = disk.device
item['mountpoint'] = disk.mountpoint
item['fstype'] = disk.fstype
item['size'] = psutil.disk_usage(disk.mountpoint).total >> 10
data.append(item)
data.sort(key=lambda x: x['device'])
except:
data = []
self.logger.error(traceback.format_exc())
return data
def test_disk_partitions_and_usage(self):
# test psutil.disk_usage() and psutil.disk_partitions()
# against "df -a"
def df(path):
out = sh('df -P -B 1 "%s"' % path).strip()
lines = out.split('\n')
lines.pop(0)
line = lines.pop(0)
dev, total, used, free = line.split()[:4]
if dev == 'none':
dev = ''
total, used, free = int(total), int(used), int(free)
return dev, total, used, free
for part in psutil.disk_partitions(all=False):
usage = psutil.disk_usage(part.mountpoint)
dev, total, used, free = df(part.mountpoint)
self.assertEqual(usage.total, total)
# 10 MB tollerance
if abs(usage.free - free) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % (usage.free, free))
if abs(usage.used - used) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % (usage.used, used))
def test_disk_partitions_mocked(self):
# Test that ZFS partitions are returned.
with open("/proc/filesystems", "r") as f:
data = f.read()
if 'zfs' in data:
for part in psutil.disk_partitions():
if part.fstype == 'zfs':
break
else:
self.fail("couldn't find any ZFS partition")
else:
# No ZFS partitions on this system. Let's fake one.
fake_file = io.StringIO(u("nodev\tzfs\n"))
with mock.patch('psutil._pslinux.open',
return_value=fake_file, create=True) as m1:
with mock.patch(
'psutil._pslinux.cext.disk_partitions',
return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2:
ret = psutil.disk_partitions()
assert m1.called
assert m2.called
assert ret
self.assertEqual(ret[0].fstype, 'zfs')
def test_procfs_path(self):
tdir = tempfile.mkdtemp()
try:
psutil.PROCFS_PATH = tdir
self.assertRaises(IOError, psutil.virtual_memory)
self.assertRaises(IOError, psutil.cpu_times)
self.assertRaises(IOError, psutil.cpu_times, percpu=True)
self.assertRaises(IOError, psutil.boot_time)
# self.assertRaises(IOError, psutil.pids)
self.assertRaises(IOError, psutil.net_connections)
self.assertRaises(IOError, psutil.net_io_counters)
self.assertRaises(IOError, psutil.net_if_stats)
self.assertRaises(IOError, psutil.disk_io_counters)
self.assertRaises(IOError, psutil.disk_partitions)
self.assertRaises(psutil.NoSuchProcess, psutil.Process)
finally:
psutil.PROCFS_PATH = "/proc"
os.rmdir(tdir)
def test_disk_partitions_and_usage(self):
# test psutil.disk_usage() and psutil.disk_partitions()
# against "df -a"
def df(path):
out = sh('df -P -B 1 "%s"' % path).strip()
lines = out.split('\n')
lines.pop(0)
line = lines.pop(0)
dev, total, used, free = line.split()[:4]
if dev == 'none':
dev = ''
total, used, free = int(total), int(used), int(free)
return dev, total, used, free
for part in psutil.disk_partitions(all=False):
usage = psutil.disk_usage(part.mountpoint)
dev, total, used, free = df(part.mountpoint)
self.assertEqual(usage.total, total)
# 10 MB tollerance
if abs(usage.free - free) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % (usage.free, free))
if abs(usage.used - used) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % (usage.used, used))
def test_disk_partitions_mocked(self):
# Test that ZFS partitions are returned.
with open("/proc/filesystems", "r") as f:
data = f.read()
if 'zfs' in data:
for part in psutil.disk_partitions():
if part.fstype == 'zfs':
break
else:
self.fail("couldn't find any ZFS partition")
else:
# No ZFS partitions on this system. Let's fake one.
fake_file = io.StringIO(u("nodev\tzfs\n"))
with mock.patch('psutil._pslinux.open',
return_value=fake_file, create=True) as m1:
with mock.patch(
'psutil._pslinux.cext.disk_partitions',
return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2:
ret = psutil.disk_partitions()
assert m1.called
assert m2.called
assert ret
self.assertEqual(ret[0].fstype, 'zfs')
def test_procfs_path(self):
tdir = tempfile.mkdtemp()
try:
psutil.PROCFS_PATH = tdir
self.assertRaises(IOError, psutil.virtual_memory)
self.assertRaises(IOError, psutil.cpu_times)
self.assertRaises(IOError, psutil.cpu_times, percpu=True)
self.assertRaises(IOError, psutil.boot_time)
# self.assertRaises(IOError, psutil.pids)
self.assertRaises(IOError, psutil.net_connections)
self.assertRaises(IOError, psutil.net_io_counters)
self.assertRaises(IOError, psutil.net_if_stats)
self.assertRaises(IOError, psutil.disk_io_counters)
self.assertRaises(IOError, psutil.disk_partitions)
self.assertRaises(psutil.NoSuchProcess, psutil.Process)
finally:
psutil.PROCFS_PATH = "/proc"
os.rmdir(tdir)
def get_disk_rate_info(self):
returnData = {}
returnData['disk_total'] = {}
returnData['disk_used'] = {}
returnData['disk_percent'] = {}
try:
disk = psutil.disk_partitions()
for val in disk:
if val.fstype != "":
mountpoint = val.mountpoint
one = psutil.disk_usage(mountpoint)
tmp = one.total/1024/1024/1024.0
returnData['disk_total'][mountpoint] = "%.2f" % tmp
tmp = one.used/1024/1024/1024.0
returnData['disk_used'][mountpoint] = "%.2f" % tmp
returnData['disk_percent'][mountpoint] = one.percent
except Exception:
pybixlib.error(self.logHead + traceback.format_exc())
self.errorInfoDone(traceback.format_exc())
return returnData
def _get_metrics(self):
inode_files = 0
inode_free = 0
self.metrics['plugin_version'] = PLUGIN_VERSION
self.metrics['heartbeat_required'] = HEARTBEAT
for part in psutil.disk_partitions(all=False):
inode_stats = self.__get_inode_stats(part.mountpoint)
inode_files += inode_stats.f_files
inode_free += inode_stats.f_ffree
inode_use_pct = 0
inode_used = inode_files - inode_free
self.metrics['inode_total'] = inode_files
self.metrics['inode_used'] = inode_used
self.metrics['inode_free'] = inode_free
if inode_files > 0:
inode_use_pct = "{:.2f}".format((inode_used * 100.0) / inode_files )
self.metrics['inode_use_percent'] = inode_use_pct
self.units['inode_use_percent'] = "%"
self.metrics["units"] = self.units
return self.metrics
def main():
table = prettytable.PrettyTable(border=False, header=True, left_padding_width=2, padding_width=1)
table.field_names = ["Device", "Total", "Used", "Free", "Use%", "Type", "Mount"]
for part in psutil.disk_partitions(all=False):
if os.name == 'nt':
if 'cdrom' in part.opts or part.fstype == '':
# skip cd-rom drives with no disk in it; they may raise
# ENOENT, pop-up a Windows GUI error for a non-ready
# partition or just hang.
continue
if 'docker' in part.mountpoint and 'aufs' in part.mountpoint:
continue
usage = psutil.disk_usage(part.mountpoint)
table.add_row([part.device,
bytes2human(usage.total),
bytes2human(usage.used),
bytes2human(usage.free),
str(int(usage.percent)) + '%',
part.fstype,
part.mountpoint])
for field in table.field_names:
table.align[field] = "l"
print table
def collect_diskinfo(self):
global workercinfo
parts = psutil.disk_partitions()
setval = []
devices = {}
for part in parts:
# deal with each partition
if not part.device in devices:
devices[part.device] = 1
diskval = {}
diskval['device'] = part.device
diskval['mountpoint'] = part.mountpoint
try:
usage = psutil.disk_usage(part.mountpoint)
diskval['total'] = usage.total
diskval['used'] = usage.used
diskval['free'] = usage.free
diskval['percent'] = usage.percent
if(part.mountpoint.startswith('/opt/docklet/local/volume')):
# the mountpoint indicate that the data is the disk used information of a container
names = re.split('/',part.mountpoint)
container = names[len(names)-1]
if not container in workercinfo.keys():
workercinfo[container] = {}
workercinfo[container]['disk_use'] = diskval
setval.append(diskval) # make a list
except Exception as err:
logger.warning(traceback.format_exc())
logger.warning(err)
#print(output)
#print(diskparts)
return setval
# collect operating system information
def get_all_mountpoint(all=False):
partitions = psutil.disk_partitions(all=all)
prt_by_mp = [x.mountpoint for x in partitions]
return prt_by_mp
def get_disk_partitions():
partitions = psutil.disk_partitions(all=True)
by_mount_point = {}
for pt in partitions:
# OrderedDict([
# ('device', '/dev/disk1'),
# ('mountpoint', '/'),
# ('fstype', 'hfs'),
# ('opts', 'rw,local,rootfs,dovolfs,journaled,multilabel')])
by_mount_point[pt.mountpoint] = _to_dict(pt)
return by_mount_point
def collect_system_information(self):
values = {}
mem = psutil.virtual_memory()
values['memory_total'] = mem.total
import cpuinfo
cpu = cpuinfo.get_cpu_info()
values['resources_limit'] = self.resources_limit
values['cpu_name'] = cpu['brand']
values['cpu'] = [cpu['hz_advertised_raw'][0], cpu['count']]
values['nets'] = {}
values['disks'] = {}
values['gpus'] = {}
values['boot_time'] = psutil.boot_time()
try:
for gpu_id, gpu in enumerate(aetros.cuda_gpu.get_ordered_devices()):
gpu['available'] = gpu_id in self.enabled_gpus
values['gpus'][gpu_id] = gpu
except Exception: pass
for disk in psutil.disk_partitions():
try:
name = self.get_disk_name(disk[1])
values['disks'][name] = psutil.disk_usage(disk[1]).total
except Exception:
# suppress Operation not permitted
pass
try:
for id, net in psutil.net_if_stats().items():
if 0 != id.find('lo') and net.isup:
self.nets.append(id)
values['nets'][id] = net.speed or 1000
except Exception:
# suppress Operation not permitted
pass
return values
def disk_partitions(self):
"""
System disk partitions
Return list
"""
partitions = psutil.disk_partitions()
return partitions
def test_disk_partitions(self):
self.execute(psutil.disk_partitions)
def test_disks(self):
# test psutil.disk_usage() and psutil.disk_partitions()
# against "df -a"
def df(path):
out = sh('df -k "%s"' % path).strip()
lines = out.split('\n')
lines.pop(0)
line = lines.pop(0)
dev, total, used, free = line.split()[:4]
if dev == 'none':
dev = ''
total = int(total) * 1024
used = int(used) * 1024
free = int(free) * 1024
return dev, total, used, free
for part in psutil.disk_partitions(all=False):
usage = psutil.disk_usage(part.mountpoint)
dev, total, used, free = df(part.mountpoint)
self.assertEqual(part.device, dev)
self.assertEqual(usage.total, total)
# 10 MB tollerance
if abs(usage.free - free) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % usage.free, free)
if abs(usage.used - used) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % usage.used, used)
def test_disks(self):
ps_parts = psutil.disk_partitions(all=True)
wmi_parts = wmi.WMI().Win32_LogicalDisk()
for ps_part in ps_parts:
for wmi_part in wmi_parts:
if ps_part.device.replace('\\', '') == wmi_part.DeviceID:
if not ps_part.mountpoint:
# this is usually a CD-ROM with no disk inserted
break
try:
usage = psutil.disk_usage(ps_part.mountpoint)
except OSError as err:
if err.errno == errno.ENOENT:
# usually this is the floppy
break
else:
raise
self.assertEqual(usage.total, int(wmi_part.Size))
wmi_free = int(wmi_part.FreeSpace)
self.assertEqual(usage.free, wmi_free)
# 10 MB tollerance
if abs(usage.free - wmi_free) > 10 * 1024 * 1024:
self.fail("psutil=%s, wmi=%s" % (
usage.free, wmi_free))
break
else:
self.fail("can't find partition %s" % repr(ps_part))
def test_disk_usage(self):
def df(device):
out = sh("df -k %s" % device).strip()
line = out.split('\n')[1]
fields = line.split()
total = int(fields[1]) * 1024
used = int(fields[2]) * 1024
free = int(fields[3]) * 1024
percent = float(fields[4].replace('%', ''))
return (total, used, free, percent)
tolerance = 4 * 1024 * 1024 # 4MB
for part in psutil.disk_partitions(all=False):
usage = psutil.disk_usage(part.mountpoint)
try:
total, used, free, percent = df(part.device)
except RuntimeError as err:
# see:
# https://travis-ci.org/giampaolo/psutil/jobs/138338464
# https://travis-ci.org/giampaolo/psutil/jobs/138343361
if "no such file or directory" in str(err).lower() or \
"raw devices not supported" in str(err).lower():
continue
else:
raise
else:
self.assertAlmostEqual(usage.total, total, delta=tolerance)
self.assertAlmostEqual(usage.used, used, delta=tolerance)
self.assertAlmostEqual(usage.free, free, delta=tolerance)
self.assertAlmostEqual(usage.percent, percent, delta=1)