def tell_system_status(self):
import psutil
import platform
import datetime
os, name, version, _, _, _ = platform.uname()
version = version.split('-')[0]
cores = psutil.cpu_count()
cpu_percent = psutil.cpu_percent()
memory_percent = psutil.virtual_memory()[2]
disk_percent = psutil.disk_usage('/')[3]
boot_time = datetime.datetime.fromtimestamp(psutil.boot_time())
running_since = boot_time.strftime("%A %d. %B %Y")
response = "I am currently running on %s version %s. " % (os, version)
response += "This system is named %s and has %s CPU cores. " % (name, cores)
response += "Current disk_percent is %s percent. " % disk_percent
response += "Current CPU utilization is %s percent. " % cpu_percent
response += "Current memory utilization is %s percent. " % memory_percent
response += "it's running since %s." % running_since
return response
python类disk_usage()的实例源码
def test_disk_partitions_and_usage(self):
# test psutil.disk_usage() and psutil.disk_partitions()
# against "df -a"
def df(path):
out = sh('df -P -B 1 "%s"' % path).strip()
lines = out.split('\n')
lines.pop(0)
line = lines.pop(0)
dev, total, used, free = line.split()[:4]
if dev == 'none':
dev = ''
total, used, free = int(total), int(used), int(free)
return dev, total, used, free
for part in psutil.disk_partitions(all=False):
usage = psutil.disk_usage(part.mountpoint)
dev, total, used, free = df(part.mountpoint)
self.assertEqual(usage.total, total)
# 10 MB tollerance
if abs(usage.free - free) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % (usage.free, free))
if abs(usage.used - used) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % (usage.used, used))
def get_file_systems(self):
systems = psutil.disk_partitions()
for i in range(0, len(systems)):
system = systems[i]
system_options = {}
for option in system.opts.split(','):
option_local = prism.helpers.locale_('system', 'mount.options.' + option)
if option != option_local:
system_options[option] = option_local
else:
system_options[option] = prism.helpers.locale_('system', 'mount.options.unknown')
systems[i] = {'device': system.device, 'mount_point': system.mountpoint,
'fs_type': system.fstype, 'options': system_options,
'usage': psutil.disk_usage(system.mountpoint)}
return systems
def disksinfo(self):
values = []
disk_partitions = psutil.disk_partitions(all=False)
for partition in disk_partitions:
usage = psutil.disk_usage(partition.mountpoint)
device = {'device': partition.device,
'mountpoint': partition.mountpoint,
'fstype': partition.fstype,
'opts': partition.opts,
'total': usage.total,
'used': usage.used,
'free': usage.free,
'percent': usage.percent
}
values.append(device)
values = sorted(values, key=lambda device: device['device'])
return values
def get_stats():
root.authorized()
params = {}
# number of jobs in queued, running, and completed states
params['nq'] = db(jobs.state=='Q').count()
params['nr'] = db(jobs.state=='R').count()
params['nc'] = db(jobs.state=='C').count()
params['cpu'] = psutil.cpu_percent()
params['vm'] = psutil.virtual_memory().percent
params['disk'] = psutil.disk_usage('/').percent
params['cid'] = request.query.cid
params['app'] = request.query.app
return template("stats", params)
def sysInfoRaw():
import time
import psutil
cpuPercent = psutil.cpu_percent(interval=1)
memPercent = psutil.virtual_memory().percent
sda2Percent = psutil.disk_usage('/').percent
sda3Percent = psutil.disk_usage('/home').percent
seconds = int(time.time()) - int(psutil.boot_time())
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
d, h = divmod(h, 24)
uptime = [d, h, m, s]
return [cpuPercent, memPercent, sda2Percent, sda3Percent, uptime]
def test_serialization(self):
def check(ret):
if json is not None:
json.loads(json.dumps(ret))
a = pickle.dumps(ret)
b = pickle.loads(a)
self.assertEqual(ret, b)
check(psutil.Process().as_dict())
check(psutil.virtual_memory())
check(psutil.swap_memory())
check(psutil.cpu_times())
check(psutil.cpu_times_percent(interval=0))
check(psutil.net_io_counters())
if LINUX and not os.path.exists('/proc/diskstats'):
pass
else:
if not APPVEYOR:
check(psutil.disk_io_counters())
check(psutil.disk_partitions())
check(psutil.disk_usage(os.getcwd()))
check(psutil.users())
def handle_drive_intent(self, message):
partitions = psutil.disk_partitions()
for partition in partitions:
print("partition.mountpoint: %s" % partition.mountpoint)
if partition.mountpoint.startswith("/snap/"):
continue
partition_data = psutil.disk_usage(partition.mountpoint)
# total=21378641920, used=4809781248, free=15482871808,
# percent=22.5
data = {
"mountpoint": partition.mountpoint,
"total": sizeof_fmt(partition_data.total),
"used": sizeof_fmt(partition_data.used),
"free": sizeof_fmt(partition_data.free),
"percent": partition_data.percent
}
if partition_data.percent >= 90:
self.speak_dialog("drive.low", data)
else:
self.speak_dialog("drive", data)
def disk():
c = statsd.StatsClient(STATSD_HOST, 8125, prefix=PREFIX + 'system.disk')
while True:
for path, label in PATHS:
disk_usage = psutil.disk_usage(path)
st = os.statvfs(path)
total_inode = st.f_files
free_inode = st.f_ffree
inode_percentage = int(100*(float(total_inode - free_inode) / total_inode))
c.gauge('%s.inodes.percent' % label, inode_percentage)
c.gauge('%s.total' % label, disk_usage.total)
c.gauge('%s.used' % label, disk_usage.used)
c.gauge('%s.free' % label, disk_usage.free)
c.gauge('%s.percent' % label, disk_usage.percent)
time.sleep(GRANULARITY)
def POST(self):
cpuused = psutil.cpu_percent(interval=None, percpu=False)
memused = psutil.virtual_memory()[2]
diskused = psutil.disk_usage('/')[3]
pidcount = len(psutil.pids())
uptimeshell = subprocess.Popen(['uptime'], stderr=subprocess.PIPE,stdout=subprocess.PIPE)
uptimeshell.wait()
uptime = uptimeshell.communicate()
return json.dumps(
{
"code": 0,
"current": {
"cpuused": cpuused,
"memused": memused,
"diskused": diskused,
"pidcount": pidcount,
"uptime": uptime
}
}
)
def cpuinfo():
men=psutil.virtual_memory()
menab=(men.available)/(1024*1024)
menta=(men.total)/(1024*1024)
menus=(men.used)/(1024*1024)
disk=psutil.disk_usage('/')
diskta=(disk.total)/(1024*1024*1024)
diskus=(disk.used)/(1024*1024*1024)
diskfr=(disk.free)/(1024*1024*1024)
time = datetime.datetime.now()
with open('eggs.csv', 'a') as csvfile:
spamwriter = csv.writer(csvfile, delimiter=' ',
quotechar='|', quoting=csv.QUOTE_MINIMAL)
spamwriter.writerow({"??????%s"%time})
spamwriter.writerow({"cpu?????%d%s"%(psutil.cpu_percent(interval=1),"%")})
spamwriter.writerow({"???????%sMB" % int(menta)})
spamwriter.writerow({"?????????%sMB" % int(menus)})
spamwriter.writerow({"????????%sMB" % int(menab)})
spamwriter.writerow({"???????%sG" % int(diskta)})
spamwriter.writerow({"???????%sG" % int(diskus)})
spamwriter.writerow({"???????%sG" % int(diskfr)})
with open('eggs.csv', 'r') as csvfile:
spamreader = csv.reader(csvfile, delimiter=' ', quotechar='|')
for row in spamreader:
print(row)
def _get_docker_node_info(info):
"""Gets the node info specific to docker.
"""
cpucapacity = int(cpu_count() * 100)
memcapacity = (psutil.virtual_memory().total * 0.9) // _BYTES_IN_MB
# TODO: manage disk space a little better
if os.name == 'nt':
path = 'C:\\ProgramData\\docker'
else:
path = '/var/lib/docker'
diskfree = disk_usage(path).free // _BYTES_IN_MB
info.update({
'memory': '%dM' % memcapacity,
'disk': '%dM' % diskfree,
'cpu': '%d%%' % cpucapacity,
'up_since': up_since(),
})
return info
def readGcodeFilesForOrigin(origin):
if origin not in [FileDestinations.LOCAL, FileDestinations.SDCARD]:
return make_response("Unknown origin: %s" % origin, 404)
recursive = request.values.get("recursive", "false") in valid_boolean_trues
force = request.values.get("force", "false") in valid_boolean_trues
if force:
with _file_cache_mutex:
try:
del _file_cache[origin]
except KeyError:
pass
files = _getFileList(origin, recursive=recursive)
if origin == FileDestinations.LOCAL:
usage = psutil.disk_usage(settings().getBaseFolder("uploads"))
return jsonify(files=files, free=usage.free, total=usage.total)
else:
return jsonify(files=files)
def CurrentSpecs():
PrivateIP = getPrivateIP()
CPU_temp = getCPUtemperature()
CPU_usage = getCPUusage()
RAM = psutil.phymem_usage()
RAM_usage = RAM.percent
disk = psutil.disk_usage('/')
DISK_usage = disk.percent
# RAM_stats = getRAMinfo()
# RAM_total = round(int(RAM_stats[0]) / 1000,1)
# RAM_used = round(int(RAM_stats[1]) / 1000,1)
# RAM_free = round(int(RAM_stats[2]) / 1000,1)
# RAM_perc = ((RAM_total - RAM_free)/RAM_total)*100
# DISK_stats = getDiskSpace()
# DISK_free = DISK_stats[1]
# DISK_used = DISK_stats[2]
# DISK_perc = DISK_stats[3]
return(PrivateIP +':_:'+ CPU_temp +':_:'+ CPU_usage +':_:'+ str(DISK_usage) +':_:'+ str(RAM_usage))
def get_disk_info(self):
data = []
try:
disks = psutil.disk_partitions(all=True)
for disk in disks:
if not disk.device:
continue
if disk.opts.upper() in ('CDROM', 'REMOVABLE'):
continue
item = {}
item['name'] = disk.device
item['device'] = disk.device
item['mountpoint'] = disk.mountpoint
item['fstype'] = disk.fstype
item['size'] = psutil.disk_usage(disk.mountpoint).total >> 10
data.append(item)
data.sort(key=lambda x: x['device'])
except:
data = []
self.logger.error(traceback.format_exc())
return data
def update(self):
"""Function to update the entire class information."""
self.cpu["percentage"] = psutil.cpu_percent(interval=0.7)
self.boot = datetime.datetime.fromtimestamp(psutil.boot_time()).strftime(
"%Y-%m-%d %H:%M:%S")
virtual_memory = psutil.virtual_memory()
self.memory["used"] = virtual_memory.used
self.memory["free"] = virtual_memory.free
self.memory["cached"] = virtual_memory.cached
net_io_counters = psutil.net_io_counters()
self.network["packet_sent"] = net_io_counters.packets_sent
self.network["packet_recv"] = net_io_counters.packets_recv
disk_usage = psutil.disk_usage('/')
self.disk["total"] = int(disk_usage.total/1024)
self.disk["used"] = int(disk_usage.used/1024)
self.disk["free"] = int(disk_usage.free/1024)
self.timestamp = time.time()
def test_disk_partitions_and_usage(self):
# test psutil.disk_usage() and psutil.disk_partitions()
# against "df -a"
def df(path):
out = sh('df -P -B 1 "%s"' % path).strip()
lines = out.split('\n')
lines.pop(0)
line = lines.pop(0)
dev, total, used, free = line.split()[:4]
if dev == 'none':
dev = ''
total, used, free = int(total), int(used), int(free)
return dev, total, used, free
for part in psutil.disk_partitions(all=False):
usage = psutil.disk_usage(part.mountpoint)
dev, total, used, free = df(part.mountpoint)
self.assertEqual(usage.total, total)
# 10 MB tollerance
if abs(usage.free - free) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % (usage.free, free))
if abs(usage.used - used) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % (usage.used, used))
def test_serialization(self):
def check(ret):
if json is not None:
json.loads(json.dumps(ret))
a = pickle.dumps(ret)
b = pickle.loads(a)
self.assertEqual(ret, b)
check(psutil.Process().as_dict())
check(psutil.virtual_memory())
check(psutil.swap_memory())
check(psutil.cpu_times())
check(psutil.cpu_times_percent(interval=0))
check(psutil.net_io_counters())
if LINUX and not os.path.exists('/proc/diskstats'):
pass
else:
if not APPVEYOR:
check(psutil.disk_io_counters())
check(psutil.disk_partitions())
check(psutil.disk_usage(os.getcwd()))
check(psutil.users())
def test_disk_partitions_and_usage(self):
# test psutil.disk_usage() and psutil.disk_partitions()
# against "df -a"
def df(path):
out = sh('df -P -B 1 "%s"' % path).strip()
lines = out.split('\n')
lines.pop(0)
line = lines.pop(0)
dev, total, used, free = line.split()[:4]
if dev == 'none':
dev = ''
total, used, free = int(total), int(used), int(free)
return dev, total, used, free
for part in psutil.disk_partitions(all=False):
usage = psutil.disk_usage(part.mountpoint)
dev, total, used, free = df(part.mountpoint)
self.assertEqual(usage.total, total)
# 10 MB tollerance
if abs(usage.free - free) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % (usage.free, free))
if abs(usage.used - used) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % (usage.used, used))
def test_serialization(self):
def check(ret):
if json is not None:
json.loads(json.dumps(ret))
a = pickle.dumps(ret)
b = pickle.loads(a)
self.assertEqual(ret, b)
check(psutil.Process().as_dict())
check(psutil.virtual_memory())
check(psutil.swap_memory())
check(psutil.cpu_times())
check(psutil.cpu_times_percent(interval=0))
check(psutil.net_io_counters())
if LINUX and not os.path.exists('/proc/diskstats'):
pass
else:
if not APPVEYOR:
check(psutil.disk_io_counters())
check(psutil.disk_partitions())
check(psutil.disk_usage(os.getcwd()))
check(psutil.users())
def get_disk_rate_info(self):
returnData = {}
returnData['disk_total'] = {}
returnData['disk_used'] = {}
returnData['disk_percent'] = {}
try:
disk = psutil.disk_partitions()
for val in disk:
if val.fstype != "":
mountpoint = val.mountpoint
one = psutil.disk_usage(mountpoint)
tmp = one.total/1024/1024/1024.0
returnData['disk_total'][mountpoint] = "%.2f" % tmp
tmp = one.used/1024/1024/1024.0
returnData['disk_used'][mountpoint] = "%.2f" % tmp
returnData['disk_percent'][mountpoint] = one.percent
except Exception:
pybixlib.error(self.logHead + traceback.format_exc())
self.errorInfoDone(traceback.format_exc())
return returnData
def main():
table = prettytable.PrettyTable(border=False, header=True, left_padding_width=2, padding_width=1)
table.field_names = ["Device", "Total", "Used", "Free", "Use%", "Type", "Mount"]
for part in psutil.disk_partitions(all=False):
if os.name == 'nt':
if 'cdrom' in part.opts or part.fstype == '':
# skip cd-rom drives with no disk in it; they may raise
# ENOENT, pop-up a Windows GUI error for a non-ready
# partition or just hang.
continue
if 'docker' in part.mountpoint and 'aufs' in part.mountpoint:
continue
usage = psutil.disk_usage(part.mountpoint)
table.add_row([part.device,
bytes2human(usage.total),
bytes2human(usage.used),
bytes2human(usage.free),
str(int(usage.percent)) + '%',
part.fstype,
part.mountpoint])
for field in table.field_names:
table.align[field] = "l"
print table
def freeSpace(dir):
try:
import psutil
except:
logger.sendToLog("psutil in not installed!", "ERROR")
return None
try:
dir_space = psutil.disk_usage(dir)
free_space = dir_space.free
return int(free_space)
except Exception as e:
# log in to the log file
logger.sendToLog("persepolis couldn't find free space value:\n" + str(e), "ERROR")
return None
def collect_diskinfo(self):
global workercinfo
parts = psutil.disk_partitions()
setval = []
devices = {}
for part in parts:
# deal with each partition
if not part.device in devices:
devices[part.device] = 1
diskval = {}
diskval['device'] = part.device
diskval['mountpoint'] = part.mountpoint
try:
usage = psutil.disk_usage(part.mountpoint)
diskval['total'] = usage.total
diskval['used'] = usage.used
diskval['free'] = usage.free
diskval['percent'] = usage.percent
if(part.mountpoint.startswith('/opt/docklet/local/volume')):
# the mountpoint indicate that the data is the disk used information of a container
names = re.split('/',part.mountpoint)
container = names[len(names)-1]
if not container in workercinfo.keys():
workercinfo[container] = {}
workercinfo[container]['disk_use'] = diskval
setval.append(diskval) # make a list
except Exception as err:
logger.warning(traceback.format_exc())
logger.warning(err)
#print(output)
#print(diskparts)
return setval
# collect operating system information
def get_disk_used(path='/'):
du = psutil.disk_usage(path)
return list(du)[1]
def get_disk_free(path='/'):
du = psutil.disk_usage(path)
return list(du)[2]
def get_init_resource():
cpu = cpu_count() * 100
psy_mem = psutil.virtual_memory()
vrt_mem = psutil.swap_memory()
disk = list(psutil.disk_usage(get_config("env", "workspace")))[0]
return cpu, psy_mem.total, disk, vrt_mem.total + psy_mem.total
def process_size(path, id_list, item_type, asset_type, overwrite):
results = []
summation=0
path= args.size
spc=psutil.disk_usage(path).free
remain=float(spc)/1073741824
# now start downloading each file
for item_id in id_list:
url = ASSET_URL.format(item_type, item_id)
logging.info('Request: {}'.format(url))
result = SESSION.get(url)
check_status(result)
try:
if result.json()[asset_type]['status'] == 'active':
download_url = result.json()[asset_type]['location']
#print(download_url)
pool = PoolManager()
response = pool.request("GET", download_url, preload_content=False)
max_bytes = 100000000000
content_bytes = response.headers.get("Content-Length")
print("Item-ID: "+str(item_id))
#print(int(content_bytes)/1048576,"MB")
summary=float(content_bytes)/1073741824
summation=summation+summary
print(format(float(summation),'.2f'),"GB", end='\r')
#print ("Total Size in MB",summation)
else:
result = False
except KeyError:
print('Could not check activation status - asset type \'{}\' not found for {}'.format(asset_type, item_id))
result = False
results.append(result)
#print(remain,"MB")
print("Remaining Space in MB",format(float(remain*1024),'.2f'))
print("Remaining Space in GB",format(float(remain),'.2f'))
print ("Total Size in MB",format(float(summation*1024),'.2f'))
print ("Total Size in GB",format(float(summation),'.2f'))
return results
def get_disk(self):
return int(psutil.disk_usage('/')[3])
def get_total_disk(self):
return psutil.disk_usage('/')[0]