def statvfs():
docker_path = CONF.docker.root_directory
if not os.path.exists(docker_path):
docker_path = '/'
return psutil.disk_usage(docker_path)
python类disk_usage()的实例源码
def get_disk_usage(self, device_name):
return psutil.disk_usage(device_name)
def process_send_data(socket, context):
"""
Send all memory, cpu, disk, network data of computer to server(master node)
"""
while True:
try:
cpu_percent = psutil.cpu_percent(interval=1, percpu=True)
memory_info = psutil.virtual_memory()
disk_info = psutil.disk_usage('/')
pids_computer = psutil.pids()
info_to_send = json.dumps({
"computer_utc_clock": str(datetime.datetime.utcnow()),
"computer_clock": str(datetime.datetime.now()),
"hostname": platform.node(),
"mac_address": mac_address(),
"ipv4_interfaces": internet_addresses(),
"cpu": {
"percent_used": cpu_percent
},
"memory": {
"total_bytes": memory_info.total,
"total_bytes_used": memory_info.used,
"percent_used": memory_info.percent
},
"disk": {
"total_bytes": disk_info.total,
"total_bytes_used": disk_info.used,
"total_bytes_free": disk_info.free,
"percent_used": disk_info.percent
},
"process": pids_active(pids_computer)
}).encode()
#send json data in the channel 'status', although is not necessary to send.
socket.send_multipart([b"status", info_to_send])
#time.sleep(0.500)
except (KeyboardInterrupt, SystemExit):
socket.close()
context.term()
def process_send_data(socket, context):
"""
Send all memory, cpu, disk, network data of computer to server(master node)
"""
while True:
try:
cpu_percent = psutil.cpu_percent(interval=1, percpu=True)
memory_info = psutil.virtual_memory()
disk_info = psutil.disk_usage('/')
pids_computer = psutil.pids()
info_to_send = json.dumps({
"computer_utc_clock": str(datetime.datetime.utcnow()),
"computer_clock": str(datetime.datetime.now()),
"hostname": platform.node(),
"mac_address": mac_address(),
"ipv4_interfaces": internet_addresses(),
"cpu": {
"percent_used": cpu_percent
},
"memory": {
"total_bytes": memory_info.total,
"total_bytes_used": memory_info.used,
"percent_used": memory_info.percent
},
"disk": {
"total_bytes": disk_info.total,
"total_bytes_used": disk_info.used,
"total_bytes_free": disk_info.free,
"percent_used": disk_info.percent
},
"process": pids_active(pids_computer)
}).encode()
#send json data in the channel 'status', although is not necessary to send.
socket.send_multipart([b"status", info_to_send])
#time.sleep(0.500)
except (KeyboardInterrupt, SystemExit):
socket.close()
context.term()
def actions(self):
if psutil.disk_usage('/').percent > self.level:
return [AcLowHDD(self.chat_id)]
return []
def asString(self):
return '???? ????? ?? %u%%' % psutil.disk_usage('/').percent
def get_device_info(path):
try:
info = psutil.disk_usage(os.path.realpath(path))
return {"total": humanize_bytes(info.total),
"used": humanize_bytes(info.used),
"free": humanize_bytes(info.free)}
except OSError as e:
return {"ERROR": str(e)}
system_utilities.py 文件源码
项目:CommunityCellularManager
作者: facebookincubator
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def get_data(self):
"""Gets system utilization stats."""
# Get system utilization stats.
cpu_percent = psutil.cpu_percent(interval=1)
memory_percent = psutil.virtual_memory().percent
disk_percent = psutil.disk_usage('/').percent
network_io = psutil.net_io_counters()
# Compute deltas for sent and received bytes. Note this is system-wide
# network usage and not necessarily GPRS-related.
# TODO(matt): query on a specific interface..which one, I'm not sure.
if self.last_bytes_sent == 0:
bytes_sent_delta = 0
else:
bytes_sent_delta = network_io.bytes_sent - self.last_bytes_sent
self.last_bytes_sent = network_io.bytes_sent
if self.last_bytes_received == 0:
bytes_received_delta = 0
else:
bytes_received_delta = (
network_io.bytes_recv - self.last_bytes_received)
self.last_bytes_received = network_io.bytes_recv
return {
'cpu_percent': cpu_percent,
'memory_percent': memory_percent,
'disk_percent': disk_percent,
'bytes_sent_delta': bytes_sent_delta,
'bytes_received_delta': bytes_received_delta,
}
def process_size(path, id_list, item_type, asset_type, overwrite):
results = []
summation=0
path= args.size
spc=psutil.disk_usage(path).free
remain=float(spc)/1073741824
# now start downloading each file
for item_id in id_list:
url = ASSET_URL.format(item_type, item_id)
logging.info('Request: {}'.format(url))
result = SESSION.get(url)
check_status(result)
try:
if result.json()[asset_type]['status'] == 'active':
download_url = result.json()[asset_type]['location']
#print(download_url)
pool = PoolManager()
response = pool.request("GET", download_url, preload_content=False)
max_bytes = 100000000000
content_bytes = response.headers.get("Content-Length")
print("Item-ID: "+str(item_id))
#print(int(content_bytes)/1048576,"MB")
summary=float(content_bytes)/1073741824
summation=summation+summary
#print ("Total Size in MB",summation)
else:
result = False
except KeyError:
print('Could not check activation status - asset type \'{}\' not found for {}'.format(asset_type, item_id))
result = False
results.append(result)
#print(remain,"MB")
print("Remaining Space in MB",format(float(remain*1024),'.2f'))
print("Remaining Space in GB",format(float(remain),'.2f'))
print ("Total Size in MB",format(float(summation*1024),'.2f'))
print ("Total Size in GB",format(float(summation),'.2f'))
return results
def collect_system_information(self):
values = {}
mem = psutil.virtual_memory()
values['memory_total'] = mem.total
import cpuinfo
cpu = cpuinfo.get_cpu_info()
values['resources_limit'] = self.resources_limit
values['cpu_name'] = cpu['brand']
values['cpu'] = [cpu['hz_advertised_raw'][0], cpu['count']]
values['nets'] = {}
values['disks'] = {}
values['gpus'] = {}
values['boot_time'] = psutil.boot_time()
try:
for gpu_id, gpu in enumerate(aetros.cuda_gpu.get_ordered_devices()):
gpu['available'] = gpu_id in self.enabled_gpus
values['gpus'][gpu_id] = gpu
except Exception: pass
for disk in psutil.disk_partitions():
try:
name = self.get_disk_name(disk[1])
values['disks'][name] = psutil.disk_usage(disk[1]).total
except Exception:
# suppress Operation not permitted
pass
try:
for id, net in psutil.net_if_stats().items():
if 0 != id.find('lo') and net.isup:
self.nets.append(id)
values['nets'][id] = net.speed or 1000
except Exception:
# suppress Operation not permitted
pass
return values
def partition(self, partition):
"""
System disk partition usage
Return psutil class
"""
usage = psutil.disk_usage(str(partition))
return usage
def test_disk_usage(self):
self.execute(psutil.disk_usage, '.')
def test_disks(self):
# test psutil.disk_usage() and psutil.disk_partitions()
# against "df -a"
def df(path):
out = sh('df -k "%s"' % path).strip()
lines = out.split('\n')
lines.pop(0)
line = lines.pop(0)
dev, total, used, free = line.split()[:4]
if dev == 'none':
dev = ''
total = int(total) * 1024
used = int(used) * 1024
free = int(free) * 1024
return dev, total, used, free
for part in psutil.disk_partitions(all=False):
usage = psutil.disk_usage(part.mountpoint)
dev, total, used, free = df(part.mountpoint)
self.assertEqual(part.device, dev)
self.assertEqual(usage.total, total)
# 10 MB tollerance
if abs(usage.free - free) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % usage.free, free)
if abs(usage.used - used) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % usage.used, used)
def test_disk_usage(self):
self.assert_stdout('disk_usage.py')
def test_disk_usage(self):
psutil.disk_usage(self.udir)
def test_disk_usage(self):
usage = psutil.disk_usage(os.getcwd())
assert usage.total > 0, usage
assert usage.used > 0, usage
assert usage.free > 0, usage
assert usage.total > usage.used, usage
assert usage.total > usage.free, usage
assert 0 <= usage.percent <= 100, usage.percent
if hasattr(shutil, 'disk_usage'):
# py >= 3.3, see: http://bugs.python.org/issue12442
shutil_usage = shutil.disk_usage(os.getcwd())
tolerance = 5 * 1024 * 1024 # 5MB
self.assertEqual(usage.total, shutil_usage.total)
self.assertAlmostEqual(usage.free, shutil_usage.free,
delta=tolerance)
self.assertAlmostEqual(usage.used, shutil_usage.used,
delta=tolerance)
# if path does not exist OSError ENOENT is expected across
# all platforms
fname = tempfile.mktemp()
try:
psutil.disk_usage(fname)
except OSError as err:
if err.args[0] != errno.ENOENT:
raise
else:
self.fail("OSError not raised")
def test_disk_usage_unicode(self):
# see: https://github.com/giampaolo/psutil/issues/416
safe_rmpath(TESTFN_UNICODE)
self.addCleanup(safe_rmpath, TESTFN_UNICODE)
os.mkdir(TESTFN_UNICODE)
psutil.disk_usage(TESTFN_UNICODE)
def test_disk_usage(self):
def df(device):
out = sh("df -k %s" % device).strip()
line = out.split('\n')[1]
fields = line.split()
total = int(fields[1]) * 1024
used = int(fields[2]) * 1024
free = int(fields[3]) * 1024
percent = float(fields[4].replace('%', ''))
return (total, used, free, percent)
tolerance = 4 * 1024 * 1024 # 4MB
for part in psutil.disk_partitions(all=False):
usage = psutil.disk_usage(part.mountpoint)
try:
total, used, free, percent = df(part.device)
except RuntimeError as err:
# see:
# https://travis-ci.org/giampaolo/psutil/jobs/138338464
# https://travis-ci.org/giampaolo/psutil/jobs/138343361
if "no such file or directory" in str(err).lower() or \
"raw devices not supported" in str(err).lower():
continue
else:
raise
else:
self.assertAlmostEqual(usage.total, total, delta=tolerance)
self.assertAlmostEqual(usage.used, used, delta=tolerance)
self.assertAlmostEqual(usage.free, free, delta=tolerance)
self.assertAlmostEqual(usage.percent, percent, delta=1)
def GetStats():
s = {}
s["cpu"] = round(psutil.cpu_percent()) # Used CPU
s["memory"] = round(psutil.virtual_memory().percent) # Used memory
s["disk"] = round(psutil.disk_usage('C:\\').percent) # Used disk
return s
def update_disks(self):
self.disks.clear()
for disk in psutil.disk_partitions(all=False):
disk_usage = psutil.disk_usage(disk.mountpoint)
self.disks[disk.mountpoint] = {'device': disk.device, 'real_device': disk.device, 'fstype': disk.fstype,
'opts': disk.opts, 'total': disk_usage.total, 'used': disk_usage.used,
'free': disk_usage.free, 'percent': disk_usage.percent}
if os.path.islink(disk.device):
self.disks[disk.mountpoint]['real_device'] = os.path.realpath(disk.device)
# ?????????????? ??? ???