def test_structseq(self):
import time
import os
t = time.localtime()
for proto in protocols:
s = self.dumps(t, proto)
u = self.loads(s)
self.assertEqual(t, u)
if hasattr(os, "stat"):
t = os.stat(os.curdir)
s = self.dumps(t, proto)
u = self.loads(s)
self.assertEqual(t, u)
if hasattr(os, "statvfs"):
t = os.statvfs(os.curdir)
s = self.dumps(t, proto)
u = self.loads(s)
self.assertEqual(t, u)
# Tests for protocol 2
python类statvfs()的实例源码
def fsbsize(path):
"""
Get optimal file system buffer size (in bytes) for I/O calls
"""
path = encode(path)
if os.name == "nt":
import ctypes
drive = "%s\\" % os.path.splitdrive(path)[0]
cluster_sectors, sector_size = ctypes.c_longlong(0)
ctypes.windll.kernel32.GetDiskFreeSpaceW(ctypes.c_wchar_p(drive),
ctypes.pointer(
cluster_sectors),
ctypes.pointer(sector_size),
None,
None)
return cluster_sectors * sector_size
else:
return os.statvfs(path).f_frsize
def test_structseq(self):
import time
import os
t = time.localtime()
for proto in protocols:
s = self.dumps(t, proto)
u = self.loads(s)
self.assertEqual(t, u)
if hasattr(os, "stat"):
t = os.stat(os.curdir)
s = self.dumps(t, proto)
u = self.loads(s)
self.assertEqual(t, u)
if hasattr(os, "statvfs"):
t = os.statvfs(os.curdir)
s = self.dumps(t, proto)
u = self.loads(s)
self.assertEqual(t, u)
def bestRecordingLocation(candidates):
path = ''
biggest = 0
for candidate in candidates:
try:
stat = os.statvfs(candidate[1])
# must have some free space (i.e. not read-only)
if stat.f_bavail:
# Free space counts double
size = (stat.f_blocks + stat.f_bavail) * stat.f_bsize
if size > biggest:
path = candidate[1]
biggest = size
except Exception, e:
print "[DRL]", e
return path
def select(self):
currentFolder = self.getPreferredFolder()
# Do nothing unless current Directory is valid
if currentFolder is not None:
# Check if we need to have a minimum of free Space available
if self.minFree is not None:
# Try to read fs stats
try:
s = os.statvfs(currentFolder)
if (s.f_bavail * s.f_bsize) / 1000000 > self.minFree:
# Automatically confirm if we have enough free disk Space available
return self.selectConfirmed(True)
except OSError:
pass
# Ask User if he really wants to select this folder
self.session.openWithCallback(
self.selectConfirmed,
MessageBox,
_("There might not be enough space on the selected partition..\nDo you really want to continue?"),
type = MessageBox.TYPE_YESNO
)
# No minimum free Space means we can safely close
else:
self.selectConfirmed(True)
def freespace(self):
self.MountPath = None
if not self.dirname:
dirname = findSafeRecordPath(defaultMoviePath())
else:
dirname = findSafeRecordPath(self.dirname)
if dirname is None:
dirname = findSafeRecordPath(defaultMoviePath())
self.dirnameHadToFallback = True
if not dirname:
return False
self.MountPath = dirname
mountwriteable = os.access(dirname, os.W_OK)
if not mountwriteable:
self.log(0, ("Mount '%s' is not writeable." % dirname))
return False
s = os.statvfs(dirname)
if (s.f_bavail * s.f_bsize) / 1000000 < 1024:
self.log(0, "Not enough free space to record")
return False
else:
self.log(0, "Found enough free space to record")
return True
def disk_metrics():
"""Get the disk usage, in bytes."""
def add_usage(ms, dus, dname):
try:
for i, val in enumerate(dus):
if val.decode("utf-8") == dname:
ms[dname] = int(dus[i-1])
except:
pass
# Overall disk usage
statinfo = os.statvfs("/")
metrics = {"used": statinfo.f_frsize * (statinfo.f_blocks - statinfo.f_bfree)}
# Per-directory disk usage
dirs = ["/home", "/nix", "/srv", "/tmp", "/var"]
argv = ["du", "-s", "-b"]
argv.extend(dirs) # why doesn't python have an expression variant of this!?
dus = subprocess.check_output(argv).split()
for dname in dirs:
add_usage(metrics, dus, dname)
return metrics
def disk():
c = statsd.StatsClient(STATSD_HOST, 8125, prefix=PREFIX + 'system.disk')
while True:
for path, label in PATHS:
disk_usage = psutil.disk_usage(path)
st = os.statvfs(path)
total_inode = st.f_files
free_inode = st.f_ffree
inode_percentage = int(100*(float(total_inode - free_inode) / total_inode))
c.gauge('%s.inodes.percent' % label, inode_percentage)
c.gauge('%s.total' % label, disk_usage.total)
c.gauge('%s.used' % label, disk_usage.used)
c.gauge('%s.free' % label, disk_usage.free)
c.gauge('%s.percent' % label, disk_usage.percent)
time.sleep(GRANULARITY)
def df(fs=".", blocksize=1024):
"returns free space on a filesystem (in bytes)"
try:
from os import statvfs
statvfs_found = 1
except:
statvfs_found = 0
if statvfs_found:
(f_bsize, f_frsize, f_blocks, f_bfree, f_bavail, f_files,
f_ffree, f_favail, f_flag, f_namemax) = statvfs(fs)
return long(f_bavail) * long(f_bsize)
else:
# Not very portable
p = os.popen("df " + fs)
s = string.split(string.rstrip(p.readline()))
for i in range(len(s)):
if s[i] == "Available":
s = string.split(string.rstrip(p.readline()))
p.close()
return int(s[i]) * long(blocksize)
p.close()
def test_structseq(self):
import time
import os
t = time.localtime()
for proto in protocols:
s = self.dumps(t, proto)
u = self.loads(s)
self.assertEqual(t, u)
if hasattr(os, "stat"):
t = os.stat(os.curdir)
s = self.dumps(t, proto)
u = self.loads(s)
self.assertEqual(t, u)
if hasattr(os, "statvfs"):
t = os.statvfs(os.curdir)
s = self.dumps(t, proto)
u = self.loads(s)
self.assertEqual(t, u)
# Tests for protocol 2
def test_structseq(self):
import time
import os
t = time.localtime()
for proto in protocols:
s = self.dumps(t, proto)
u = self.loads(s)
self.assert_is_copy(t, u)
if hasattr(os, "stat"):
t = os.stat(os.curdir)
s = self.dumps(t, proto)
u = self.loads(s)
self.assert_is_copy(t, u)
if hasattr(os, "statvfs"):
t = os.statvfs(os.curdir)
s = self.dumps(t, proto)
u = self.loads(s)
self.assert_is_copy(t, u)
def statfs(self):
"""
Should return an object with statvfs attributes (f_bsize, f_frsize...).
Eg., the return value of os.statvfs() is such a thing (since py 2.2).
If you are not reusing an existing statvfs object, start with
fuse.StatVFS(), and define the attributes.
To provide usable information (ie., you want sensible df(1)
output, you are suggested to specify the following attributes:
- f_bsize - preferred size of file blocks, in bytes
- f_frsize - fundamental size of file blcoks, in bytes
[if you have no idea, use the same as blocksize]
- f_blocks - total number of blocks in the filesystem
- f_bfree - number of free blocks
- f_files - total number of file inodes
- f_ffree - nunber of free file inodes
"""
return os.statvfs(".")
def statfs(self, path):
if common.windows:
lpSectorsPerCluster = ctypes.c_ulonglong(0)
lpBytesPerSector = ctypes.c_ulonglong(0)
lpNumberOfFreeClusters = ctypes.c_ulonglong(0)
lpTotalNumberOfClusters = ctypes.c_ulonglong(0)
ret = windll.kernel32.GetDiskFreeSpaceW(ctypes.c_wchar_p(path), ctypes.pointer(lpSectorsPerCluster), ctypes.pointer(lpBytesPerSector), ctypes.pointer(lpNumberOfFreeClusters), ctypes.pointer(lpTotalNumberOfClusters))
if not ret:
raise WindowsError
free_blocks = lpNumberOfFreeClusters.value * lpSectorsPerCluster.value
result = {'f_bavail': free_blocks,
'f_bfree': free_blocks,
'f_bsize': lpBytesPerSector.value,
'f_frsize': lpBytesPerSector.value,
'f_blocks': lpTotalNumberOfClusters.value * lpSectorsPerCluster.value,
'f_namemax': wintypes.MAX_PATH}
return result
else:
stv = os.statvfs(path)
# f_flag causes python interpreter crashes in some cases. i don't get it.
return dict((key, getattr(stv, key)) for key in ('f_bavail', 'f_bfree', 'f_blocks', 'f_bsize', 'f_favail',
'f_ffree', 'f_files', 'f_frsize', 'f_namemax'))
def test_structseq(self):
import time
import os
t = time.localtime()
for proto in protocols:
s = self.dumps(t, proto)
u = self.loads(s)
self.assertEqual(t, u)
if hasattr(os, "stat"):
t = os.stat(os.curdir)
s = self.dumps(t, proto)
u = self.loads(s)
self.assertEqual(t, u)
if hasattr(os, "statvfs"):
t = os.statvfs(os.curdir)
s = self.dumps(t, proto)
u = self.loads(s)
self.assertEqual(t, u)
# Tests for protocol 2
def _get_stats(mount_point):
data = os.statvfs(mount_point)
total = (data.f_blocks * data.f_bsize)
free = (data.f_bfree * data.f_bsize)
used_percent = 100 - (100.0 * free / total)
total_inode = data.f_files
free_inode = data.f_ffree
used_percent_inode = 100 - (100.0 * free_inode / total_inode)
used = total - free
used_inode = total_inode - free_inode
return {'total': total,
'free': free,
'used_percent': used_percent,
'total_inode': total_inode,
'free_inode': free_inode,
'used_inode': used_inode,
'used': used,
'used_percent_inode': used_percent_inode}
def get_total_disk_space():
cwd = os.getcwd()
# Windows is the only platform that doesn't support os.statvfs, so
# we need to special case this.
if sys.platform.startswith('win'):
_, total, free = (ctypes.c_ulonglong(), ctypes.c_ulonglong(), \
ctypes.c_ulonglong())
if sys.version_info >= (3,) or isinstance(cwd, unicode):
fn = ctypes.windll.kernel32.GetDiskFreeSpaceExW
else:
fn = ctypes.windll.kernel32.GetDiskFreeSpaceExA
ret = fn(cwd, ctypes.byref(_), ctypes.byref(total), ctypes.byref(free))
if ret == 0:
# WinError() will fetch the last error code.
raise ctypes.WinError()
return (total.value, free.value)
else:
st = os.statvfs(cwd)
free = st.f_bavail * st.f_frsize
total = st.f_blocks * st.f_frsize
return (total, free)
def get_total_disk_space():
cwd = os.getcwd()
# Windows is the only platform that doesn't support os.statvfs, so
# we need to special case this.
if sys.platform.startswith('win'):
_, total, free = (ctypes.c_ulonglong(), ctypes.c_ulonglong(), \
ctypes.c_ulonglong())
if sys.version_info >= (3,) or isinstance(cwd, unicode):
fn = ctypes.windll.kernel32.GetDiskFreeSpaceExW
else:
fn = ctypes.windll.kernel32.GetDiskFreeSpaceExA
ret = fn(cwd, ctypes.byref(_), ctypes.byref(total), ctypes.byref(free))
if ret == 0:
# WinError() will fetch the last error code.
raise ctypes.WinError()
return (total.value, free.value)
else:
st = os.statvfs(cwd)
free = st.f_bavail * st.f_frsize
total = st.f_blocks * st.f_frsize
return (total, free)
def _getStats(mountPoint):
data = os.statvfs(mountPoint)
total = (data.f_blocks * data.f_bsize) / ONE_GB_BYTES
free = (data.f_bfree * data.f_bsize) / ONE_GB_BYTES
used_percent = 100 - (100.0 * free / total)
total_inode = data.f_files
free_inode = data.f_ffree
used_percent_inode = 100 - (100.0 * free_inode / total_inode)
used = total - free
used_inode = total_inode - free_inode
return {'total': total,
'free': free,
'used_percent': used_percent,
'total_inode': total_inode,
'free_inode': free_inode,
'used_inode': used_inode,
'used': used,
'used_percent_inode': used_percent_inode}
def disk_usage(path):
"""Return disk usage associated with path."""
try:
st = os.statvfs(path)
except UnicodeEncodeError:
if not PY3 and isinstance(path, unicode):
# this is a bug with os.statvfs() and unicode on
# Python 2, see:
# - https://github.com/giampaolo/psutil/issues/416
# - http://bugs.python.org/issue18695
try:
path = path.encode(sys.getfilesystemencoding())
except UnicodeEncodeError:
pass
st = os.statvfs(path)
else:
raise
free = (st.f_bavail * st.f_frsize)
total = (st.f_blocks * st.f_frsize)
used = (st.f_blocks - st.f_bfree) * st.f_frsize
percent = usage_percent(used, total, _round=1)
# NB: the percentage is -5% than what shown by df due to
# reserved blocks that we are currently not considering:
# http://goo.gl/sWGbH
return sdiskusage(total, used, free, percent)
def get_free_space_bytes(folder):
""" Return folder/drive free space (in bytes)
"""
if platform.system() == 'Windows':
_free_bytes = ctypes.c_ulonglong(0)
_total_bytes = ctypes.c_ulonglong(0)
ctypes.windll.kernel32.GetDiskFreeSpaceExW(folder, None, ctypes.pointer(_total_bytes), ctypes.pointer(_free_bytes))
total_bytes = _total_bytes.value
free_bytes = _free_bytes.value
else:
try:
st = os.statvfs(folder)
total_bytes = st.f_blocks * st.f_frsize
free_bytes = st.f_bavail * st.f_frsize
except:
total_bytes = 0
free_bytes = 0
return total_bytes, free_bytes