def mounts(self, detectdev=False):
mounts = []
with open('/proc/mounts', 'r') as f:
for line in f:
dev, path, fstype = line.split()[0:3]
if fstype in ('ext2', 'ext3', 'ext4', 'xfs',
'jfs', 'reiserfs', 'btrfs',
'simfs'): # simfs: filesystem in OpenVZ
if not os.path.isdir(path): continue
mounts.append({'dev': dev, 'path': path, 'fstype': fstype})
for mount in mounts:
stat = os.statvfs(mount['path'])
total = stat.f_blocks*stat.f_bsize
free = stat.f_bfree*stat.f_bsize
used = (stat.f_blocks-stat.f_bfree)*stat.f_bsize
mount['total'] = b2h(total)
mount['free'] = b2h(free)
mount['used'] = b2h(used)
mount['used_rate'] = div_percent(used, total)
if detectdev:
dev = os.stat(mount['path']).st_dev
mount['major'], mount['minor'] = os.major(dev), os.minor(dev)
return mounts
python类statvfs()的实例源码
def used_inodes_percent(self, check_config):
check_config = self._strip_percent_sign_from_check_config(check_config)
for part in self.psutil.disk_partitions(all=False):
s = os.statvfs(part.mountpoint)
try:
inodes_usage = int((s.f_files - s.f_favail) * 100 / s.f_files)
except ZeroDivisionError:
continue
status = self._value_to_status_less(
inodes_usage, check_config, self._strip_percent_sign
)
if status != self.STATUS_OK:
return (
status,
'Partition {} uses {}% of inodes'.format(part.mountpoint,
inodes_usage)
)
return self.STATUS_OK, 'Inodes usage correct'
def get_path_usage(path):
space_st = os.statvfs(path)
# f_bavail: without blocks reserved for super users
# f_bfree: with blocks reserved for super users
avail = space_st.f_frsize * space_st.f_bavail
capa = space_st.f_frsize * space_st.f_blocks
used = capa - avail
return {
'total': capa,
'used': used,
'available': avail,
'percent': float(used) / capa,
}
def _check_dir_free_space(chk_dir, required_space=1):
"""Check that directory has some free space.
:param chk_dir: Directory to check
:param required_space: amount of space to check for in MiB.
:raises InsufficientDiskSpace: if free space is < required space
"""
# check that we have some free space
stat = os.statvfs(chk_dir)
# get dir free space in MiB.
free_space = float(stat.f_bsize * stat.f_bavail) / 1024 / 1024
# check for at least required_space MiB free
if free_space < required_space:
raise exception.InsufficientDiskSpace(path=chk_dir,
required=required_space,
actual=free_space)
def select(self):
currentFolder = self.getPreferredFolder()
# Do nothing unless current Directory is valid
if currentFolder is not None:
# Check if we need to have a minimum of free Space available
if self.minFree is not None:
# Try to read fs stats
try:
s = os.statvfs(currentFolder)
if (s.f_bavail * s.f_bsize) / 1000000 > self.minFree:
# Automatically confirm if we have enough free disk Space available
return self.selectConfirmed(True)
except OSError:
pass
# Ask User if he really wants to select this folder
self.session.openWithCallback(
self.selectConfirmed,
MessageBox,
_("There might not be enough Space on the selected Partition.\nDo you really want to continue?"),
type = MessageBox.TYPE_YESNO
)
# No minimum free Space means we can safely close
else:
self.selectConfirmed(True)
def update(self):
try:
stat = statvfs(self.path)
except OSError:
return -1
if self.type == self.FREE:
try:
percent = '(' + str((100 * stat.f_bavail) // stat.f_blocks) + '%)'
free = stat.f_bfree * stat.f_bsize
if free < 10000000:
free = _("%d kB") % (free >> 10)
elif free < 10000000000:
free = _("%d MB") % (free >> 20)
else:
free = _("%d Gb") % (free >> 30)
self.setText(" ".join((free, percent, _("free diskspace"))))
except:
# occurs when f_blocks is 0 or a similar error
self.setText("-?-")
def getDiskInfo(self, path):
def isMountPoint():
try:
fd = open('/proc/mounts', 'r')
for line in fd:
l = line.split()
if len(l) > 1 and l[1] == path:
return True
fd.close()
except:
return None
return False
result = [0,0,0,0] # (size, used, avail, use%)
if isMountPoint():
try:
st = statvfs(path)
except:
st = None
if not st is None and not 0 in (st.f_bsize, st.f_blocks):
result[0] = st.f_bsize * st.f_blocks # size
result[2] = st.f_bsize * st.f_bavail # avail
result[1] = result[0] - result[2] # used
result[3] = result[1] * 100 / result[0] # use%
return result
def diskSize(self):
cap = 0
try:
line = readFile(self.sysfsPath('size'))
cap = int(line)
return cap / 1000 * 512 / 1000
except:
dev = self.findMount()
if dev:
try:
stat = os.statvfs(dev)
cap = int(stat.f_blocks * stat.f_bsize)
return cap / 1000 / 1000
except:
pass
return cap
def diskUsage(path):
"""Return disk usage statistics about the given path. Inspired by:
https://stackoverflow.com/questions/4274899/get-actual-disk-space
Returned values is a named tuple with attributes 'total', 'used' and
'free', which are the amount of total, used and free space, in bytes.
"""
st = os.statvfs(path)
free = st.f_bavail * st.f_frsize
total = st.f_blocks * st.f_frsize
used = (st.f_blocks - st.f_bfree) * st.f_frsize
return {'total':total, 'used':used, 'free':free}
########################
### Convert to FASTQ ###
########################
### Before beginning, check there is enough memory on disk
def getFreeSpace(self):
free_space = 0
if "statvfs" in dir(os): # Unix
statvfs = os.statvfs(config.data_dir)
free_space = statvfs.f_frsize * statvfs.f_bavail
else: # Windows
try:
import ctypes
free_space_pointer = ctypes.c_ulonglong(0)
ctypes.windll.kernel32.GetDiskFreeSpaceExW(
ctypes.c_wchar_p(config.data_dir), None, None, ctypes.pointer(free_space_pointer)
)
free_space = free_space_pointer.value
except Exception, err:
self.log.debug("GetFreeSpace error: %s" % err)
return free_space
def stream_host_stats():
while True:
net = psutil.net_io_counters(pernic=True)
time.sleep(1)
net1 = psutil.net_io_counters(pernic=True)
net_stat_download = {}
net_stat_upload = {}
for k, v in net.items():
for k1, v1 in net1.items():
if k1 == k:
net_stat_download[k] = (v1.bytes_recv - v.bytes_recv) / 1000.
net_stat_upload[k] = (v1.bytes_sent - v.bytes_sent) / 1000.
ds = statvfs('/')
disk_str = {"Used": ((ds.f_blocks - ds.f_bfree) * ds.f_frsize) / 10 ** 9, "Unused": (ds.f_bavail * ds.f_frsize) / 10 ** 9}
yield '[{"cpu":"%s","memory":"%s","memTotal":"%s","net_stats_down":"%s","net_stats_up":"%s","disk":"%s"}],' \
% (psutil.cpu_percent(interval=1), psutil.virtual_memory().used, psutil.virtual_memory().free, \
net_stat_download, net_stat_upload, disk_str)
def disk_usage(path):
"""Return disk usage associated with path."""
try:
st = os.statvfs(path)
except UnicodeEncodeError:
if not PY3 and isinstance(path, unicode):
# this is a bug with os.statvfs() and unicode on
# Python 2, see:
# - https://github.com/giampaolo/psutil/issues/416
# - http://bugs.python.org/issue18695
try:
path = path.encode(sys.getfilesystemencoding())
except UnicodeEncodeError:
pass
st = os.statvfs(path)
else:
raise
free = (st.f_bavail * st.f_frsize)
total = (st.f_blocks * st.f_frsize)
used = (st.f_blocks - st.f_bfree) * st.f_frsize
percent = usage_percent(used, total, _round=1)
# NB: the percentage is -5% than what shown by df due to
# reserved blocks that we are currently not considering:
# http://goo.gl/sWGbH
return sdiskusage(total, used, free, percent)
def statfs(self, path):
full_path = self.join_path(path)
stv = os.statvfs(full_path)
stat = dict((key, getattr(stv, key))
for key in ('f_bavail',
'f_bfree',
'f_blocks',
'f_bsize',
'f_favail',
'f_ffree',
'f_files',
'f_flag',
'f_frsize',
'f_namemax'
)
)
return stat
def statfs(self, path):
full_path = self.join_path(path)
stv = os.statvfs(full_path)
stat = dict((key, getattr(stv, key))
for key in ('f_bavail',
'f_bfree',
'f_blocks',
'f_bsize',
'f_favail',
'f_ffree',
'f_files',
'f_flag',
'f_frsize',
'f_namemax'
)
)
return stat
def statfs(self, path):
full_path = self._full_path(path)
stv = os.statvfs(full_path)
stat = dict((key, getattr(stv, key))
for key in ('f_bavail',
'f_bfree',
'f_blocks',
'f_bsize',
'f_favail',
'f_ffree',
'f_files',
'f_flag',
'f_frsize',
'f_namemax'
)
)
return stat
def mounts(self, detectdev=False):
mounts = []
with open('/proc/mounts', 'r') as f:
for line in f:
dev, path, fstype = line.split()[0:3]
if fstype in ('ext2', 'ext3', 'ext4', 'xfs',
'jfs', 'reiserfs', 'btrfs',
'simfs'): # simfs: filesystem in OpenVZ
if not os.path.isdir(path): continue
mounts.append({'dev': dev, 'path': path, 'fstype': fstype})
for mount in mounts:
stat = os.statvfs(mount['path'])
total = stat.f_blocks*stat.f_bsize
free = stat.f_bfree*stat.f_bsize
used = (stat.f_blocks-stat.f_bfree)*stat.f_bsize
mount['total'] = b2h(total)
mount['free'] = b2h(free)
mount['used'] = b2h(used)
mount['used_rate'] = div_percent(used, total)
if detectdev:
dev = os.stat(mount['path']).st_dev
mount['major'], mount['minor'] = os.major(dev), os.minor(dev)
return mounts
def mounts(self, detectdev=False):
mounts = []
with open('/proc/mounts', 'r') as f:
for line in f:
dev, path, fstype = line.split()[0:3]
if fstype in ('ext2', 'ext3', 'ext4', 'xfs',
'jfs', 'reiserfs', 'btrfs',
'simfs'): # simfs: filesystem in OpenVZ
if not os.path.isdir(path): continue
mounts.append({'dev': dev, 'path': path, 'fstype': fstype})
for mount in mounts:
stat = os.statvfs(mount['path'])
total = stat.f_blocks*stat.f_bsize
free = stat.f_bfree*stat.f_bsize
used = (stat.f_blocks-stat.f_bfree)*stat.f_bsize
mount['total'] = b2h(total)
mount['free'] = b2h(free)
mount['used'] = b2h(used)
mount['used_rate'] = div_percent(used, total)
if detectdev:
dev = os.stat(mount['path']).st_dev
mount['major'], mount['minor'] = os.major(dev), os.minor(dev)
return mounts
def _get_stats(self, mount_point):
data = os.statvfs(mount_point)
total = (data.f_blocks * data.f_bsize)
free = (data.f_bfree * data.f_bsize)
used_percent = 100 - (100.0 * free / total)
total_inode = data.f_files
free_inode = data.f_ffree
used_percent_inode = 100 - (100.0 * free_inode / total_inode)
used = total - free
used_inode = total_inode - free_inode
return {'total': total,
'free': free,
'used_percent': used_percent,
'total_inode': total_inode,
'free_inode': free_inode,
'used_inode': used_inode,
'used': used,
'used_percent_inode': used_percent_inode}
def disk_usage(path):
"""Return disk usage associated with path."""
try:
st = os.statvfs(path)
except UnicodeEncodeError:
if not PY3 and isinstance(path, unicode):
# this is a bug with os.statvfs() and unicode on
# Python 2, see:
# - https://github.com/giampaolo/psutil/issues/416
# - http://bugs.python.org/issue18695
try:
path = path.encode(sys.getfilesystemencoding())
except UnicodeEncodeError:
pass
st = os.statvfs(path)
else:
raise
free = (st.f_bavail * st.f_frsize)
total = (st.f_blocks * st.f_frsize)
used = (st.f_blocks - st.f_bfree) * st.f_frsize
percent = usage_percent(used, total, _round=1)
# NB: the percentage is -5% than what shown by df due to
# reserved blocks that we are currently not considering:
# http://goo.gl/sWGbH
return sdiskusage(total, used, free, percent)
def select(self):
currentFolder = self.getPreferredFolder()
# Do nothing unless current Directory is valid
if currentFolder is not None:
# Check if we need to have a minimum of free Space available
if self.minFree is not None:
# Try to read fs stats
try:
s = os.statvfs(currentFolder)
if (s.f_bavail * s.f_bsize) / 1000000 > self.minFree:
# Automatically confirm if we have enough free disk Space available
return self.selectConfirmed(True)
except OSError:
pass
# Ask User if he really wants to select this folder
self.session.openWithCallback(
self.selectConfirmed,
MessageBox,
_("There might not be enough Space on the selected Partition.\nDo you really want to continue?"),
type = MessageBox.TYPE_YESNO
)
# No minimum free Space means we can safely close
else:
self.selectConfirmed(True)
def update(self):
try:
stat = statvfs(self.path)
except OSError:
return -1
if self.type == self.FREE:
try:
percent = '(' + str((100 * stat.f_bavail) // stat.f_blocks) + '%)'
free = stat.f_bfree * stat.f_bsize
if free < 10000000:
free = "%d kB" % (free >> 10)
elif free < 10000000000:
free = "%d MB" % (free >> 20)
else:
free = "%d GB" % (free >> 30)
self.setText(_("%s %s free disk space") % (free, percent))
except:
# occurs when f_blocks is 0 or a similar error
self.setText("-?-")
def diskSize(self):
cap = 0
try:
line = readFile(self.sysfsPath('size'))
cap = int(line)
return cap / 1000 * 512 / 1000
except:
dev = self.findMount()
if dev:
try:
stat = os.statvfs(dev)
cap = int(stat.f_blocks * stat.f_bsize)
return cap / 1000 / 1000
except:
pass
return cap
def main():
args = parse_args()
mountpoints = []
with open('/proc/mounts', 'r') as fp:
for line in fp:
a, mountpoint, fstype, a = line.split(' ', 3)
if fstype in ['ext2', 'ext3', 'ext4', 'xfs']:
mountpoints.append(mountpoint)
template = args.prefix + '.{}.{} {} ' + str(int(time()))
for mp in mountpoints:
stat = os.statvfs(mp)
used = stat.f_frsize * stat.f_blocks - stat.f_bfree * stat.f_bsize
size = stat.f_frsize * stat.f_blocks
if mp == '/':
mp = 'rootfs'
mp = mp.replace('/', '_').lstrip('_')
print(template.format(mp, 'used', used))
print(template.format(mp, 'size', size))
def test_structseq(self):
import time
import os
t = time.localtime()
for proto in protocols:
s = self.dumps(t, proto)
u = self.loads(s)
self.assertEqual(t, u)
if hasattr(os, "stat"):
t = os.stat(os.curdir)
s = self.dumps(t, proto)
u = self.loads(s)
self.assertEqual(t, u)
if hasattr(os, "statvfs"):
t = os.statvfs(os.curdir)
s = self.dumps(t, proto)
u = self.loads(s)
self.assertEqual(t, u)
# Tests for protocol 2
def test_sample_free_space(self):
"""Test collecting information about free space."""
counter = mock_counter(1)
def statvfs(path, multiplier=lambda: next(counter)):
return os.statvfs_result(
(4096, 0, mb(1000), mb(multiplier() * 100), 0, 0, 0, 0, 0, 0))
plugin = self.get_mount_info(
statvfs=statvfs, create_time=self.reactor.time)
step_size = self.monitor.step_size
self.monitor.add(plugin)
self.reactor.advance(step_size)
message = plugin.create_free_space_message()
self.assertTrue(message)
self.assertEqual(message.get("type"), "free-space")
free_space = message.get("free-space", ())
self.assertEqual(len(free_space), 1)
self.assertEqual(free_space[0], (step_size, "/", 409600))
def test_resynchronize(self):
"""
On the reactor "resynchronize" event, new mount-info messages
should be sent.
"""
plugin = self.get_mount_info(
create_time=self.reactor.time, statvfs=statvfs_result_fixture)
self.monitor.add(plugin)
plugin.run()
plugin.exchange()
self.reactor.fire("resynchronize", scopes=["disk"])
plugin.run()
plugin.exchange()
messages = self.mstore.get_pending_messages()
messages = [message for message in messages
if message["type"] == "mount-info"]
expected_message = {
"type": "mount-info",
"mount-info": [(0, {"device": "/dev/hda1", "mount-point": "/",
"total-space": 4096000,
"filesystem": "ext3"})]}
self.assertMessages(messages, [expected_message, expected_message])
def __init__(self, interval=300, monitor_interval=60 * 60,
mounts_file="/proc/mounts", create_time=time.time,
statvfs=None, mtab_file="/etc/mtab"):
self.run_interval = interval
self._monitor_interval = monitor_interval
self._create_time = create_time
self._mounts_file = mounts_file
self._mtab_file = mtab_file
if statvfs is None:
statvfs = os.statvfs
self._statvfs = statvfs
self._create_time = create_time
self._free_space = []
self._mount_info = []
self._mount_info_to_persist = None
self.is_device_removable = is_device_removable
def _get_drive_usage(path):
"""
Use Python libraries to get drive space/usage statistics. Prior to v3.3, use `os.statvfs`;
on v3.3+, use the more accurate `shutil.disk_usage`.
"""
if sys.version_info >= (3, 3):
usage = shutil.disk_usage(path)
return {
"total": usage.total,
"used": usage.used,
"free": usage.free,
}
else:
# with os.statvfs, we need to multiple block sizes by block counts to get bytes
stats = os.statvfs(path)
total = stats.f_frsize * stats.f_blocks
free = stats.f_frsize * stats.f_bavail
return {
"total": total,
"free": free,
"used": total - free,
}
def get_free_space(path=settings.KOLIBRI_HOME):
if sys.platform.startswith('win'):
import ctypes
free = ctypes.c_ulonglong(0)
check = ctypes.windll.kernel32.GetDiskFreeSpaceExW(ctypes.c_wchar_p(path), None, None, ctypes.pointer(free))
if check == 0:
raise ctypes.winError()
result = free.value
else:
st = os.statvfs(path)
result = st.f_bavail * st.f_frsize
return result
# Utility functions for pinging or killing PIDs
def test_structseq(self):
import time
import os
t = time.localtime()
for proto in protocols:
s = self.dumps(t, proto)
u = self.loads(s)
self.assertEqual(t, u)
if hasattr(os, "stat"):
t = os.stat(os.curdir)
s = self.dumps(t, proto)
u = self.loads(s)
self.assertEqual(t, u)
if hasattr(os, "statvfs"):
t = os.statvfs(os.curdir)
s = self.dumps(t, proto)
u = self.loads(s)
self.assertEqual(t, u)
# Tests for protocol 2