def test_serialization(self):
def check(ret):
if json is not None:
json.loads(json.dumps(ret))
a = pickle.dumps(ret)
b = pickle.loads(a)
self.assertEqual(ret, b)
check(psutil.Process().as_dict())
check(psutil.virtual_memory())
check(psutil.swap_memory())
check(psutil.cpu_times())
check(psutil.cpu_times_percent(interval=0))
check(psutil.net_io_counters())
if LINUX and not os.path.exists('/proc/diskstats'):
pass
else:
if not APPVEYOR:
check(psutil.disk_io_counters())
check(psutil.disk_partitions())
check(psutil.disk_usage(os.getcwd()))
check(psutil.users())
python类cpu_times_percent()的实例源码
def cpu_times_percent():
c = statsd.StatsClient(STATSD_HOST, 8125, prefix=PREFIX + 'system.cpu')
while True:
value = psutil.cpu_percent(interval=1)
c.gauge('system_wide.percent', value)
cpu_t_percent = psutil.cpu_times_percent(interval=1)
c.gauge('system_wide.times_percent.user', cpu_t_percent.user)
c.gauge('system_wide.times_percent.nice', cpu_t_percent.nice)
c.gauge('system_wide.times_percent.system', cpu_t_percent.system)
c.gauge('system_wide.times_percent.idle', cpu_t_percent.idle)
c.gauge('system_wide.times_percent.iowait', cpu_t_percent.iowait)
c.gauge('system_wide.times_percent.irq', cpu_t_percent.irq)
c.gauge('system_wide.times_percent.softirq', cpu_t_percent.softirq)
c.gauge('system_wide.times_percent.steal', cpu_t_percent.steal)
c.gauge('system_wide.times_percent.guest', cpu_t_percent.guest)
c.gauge('system_wide.times_percent.guest_nice', cpu_t_percent.guest_nice)
time.sleep(GRANULARITY)
def monitor(frist_invoke=2):
cpu = psutil.cpu_times_percent(interval=frist_invoke, percpu=False)
cpu_percent = psutil.cpu_percent(interval=frist_invoke)
value_dic = {
'cpu':{
'cpu.user': cpu.user,
'cpu.nice': cpu.nice,
'cpu.system':cpu.system,
'cpu.idle':cpu.idle,
'cpu.iowait': cpu.iowait,
'cpu.irq': cpu.irq,
'cpu.softirq': cpu.softirq,
'cpu.steal': cpu.steal,
'cpu.guest': cpu.guest,
'cpu.percent': cpu_percent
}
}
return value_dic
def test_serialization(self):
def check(ret):
if json is not None:
json.loads(json.dumps(ret))
a = pickle.dumps(ret)
b = pickle.loads(a)
self.assertEqual(ret, b)
check(psutil.Process().as_dict())
check(psutil.virtual_memory())
check(psutil.swap_memory())
check(psutil.cpu_times())
check(psutil.cpu_times_percent(interval=0))
check(psutil.net_io_counters())
if LINUX and not os.path.exists('/proc/diskstats'):
pass
else:
if not APPVEYOR:
check(psutil.disk_io_counters())
check(psutil.disk_partitions())
check(psutil.disk_usage(os.getcwd()))
check(psutil.users())
def test_serialization(self):
def check(ret):
if json is not None:
json.loads(json.dumps(ret))
a = pickle.dumps(ret)
b = pickle.loads(a)
self.assertEqual(ret, b)
check(psutil.Process().as_dict())
check(psutil.virtual_memory())
check(psutil.swap_memory())
check(psutil.cpu_times())
check(psutil.cpu_times_percent(interval=0))
check(psutil.net_io_counters())
if LINUX and not os.path.exists('/proc/diskstats'):
pass
else:
if not APPVEYOR:
check(psutil.disk_io_counters())
check(psutil.disk_partitions())
check(psutil.disk_usage(os.getcwd()))
check(psutil.users())
getSystemStatus.py 文件源码
项目:LinuxBashShellScriptForOps
作者: DingGuodong
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def getLoadAverage():
if linux:
import multiprocessing
k = 1.0
k /= multiprocessing.cpu_count()
if os.path.exists('/proc/loadavg'):
return [float(open('/proc/loadavg').read().split()[x]) * k for x in range(3)]
else:
tokens = subprocess.check_output(['uptime']).split()
return [float(x.strip(',')) * k for x in tokens[-3:]]
if mswindows:
# TODO(Guodong Ding) get this field data like on Linux for Windows
# print psutil.cpu_percent()
# print psutil.cpu_times_percent()
# print psutil.cpu_times()
# print psutil.cpu_stats()
return "%.2f%%" % psutil.cpu_percent()
def collect_cpuinfo(self):
cpuinfo = psutil.cpu_times_percent(interval=1,percpu=False)
cpuset = {}
cpuset['user'] = cpuinfo.user
cpuset['system'] = cpuinfo.system
cpuset['idle'] = cpuinfo.idle
cpuset['iowait'] = cpuinfo.iowait
# get processors information from /proc/cpuinfo
output = subprocess.check_output(["cat /proc/cpuinfo"],shell=True)
output = output.decode('utf-8')
parts = output.split('\n')
info = []
idx = -1
for part in parts:
if not part == '':
key_val = re.split(':',part)
key = key_val[0].rstrip()
if key == 'processor':
info.append({})
idx += 1
val = key_val[1].lstrip()
if key=='processor' or key=='model name' or key=='core id' or key=='cpu MHz' or key=='cache size' or key=='physical id':
info[idx][key] = val
return [cpuset, info]
# collect disk used information
def __get_cpu_usage(self):
return (100.0 - psutil.cpu_times_percent(interval=0.1).idle) / 100.0
def get_system_load(self):
return {
MEM_AVAILABLE: psutil.virtual_memory().available,
CPU_IDLE_PERCENT: psutil.cpu_times_percent(
self.cpu_sample_interval).idle,
CLIENT_NUMBER: len(self.protocol.server.clients),
}
def save(self):
cpu_info = psutil.cpu_times_percent()
msg = {'user': cpu_info.user,
'nice': cpu_info.nice,
'system': cpu_info.system,
'iowait': cpu_info.iowait,
'irq': cpu_info.irq,
'softirq': cpu_info.softirq,
'steal': cpu_info.steal,
'guest': cpu_info.guest,
'guest_nice': cpu_info.guest_nice}
self._save(**msg)
def get_stats(self):
cpct = psutil.cpu_percent(interval=0)
ctimes = psutil.cpu_times_percent()
self.cpu_stats = CpuStats(cpct, ctimes.user, ctimes.system,
ctimes.idle)
self.vmem_stats = psutil.virtual_memory()
self.disk_stats = psutil.disk_io_counters()
self.net_stats = psutil.net_io_counters()
# must create new stats list each time stats are updated
# because named tuples are immutable
self.statslist = [self.cpu_stats, self.vmem_stats, self.disk_stats,
self.net_stats]
def test_cpu_times_percent(self):
last = psutil.cpu_times_percent(interval=0.001)
for x in range(100):
new = psutil.cpu_times_percent(interval=None)
for percent in new:
self._test_cpu_percent(percent, last, new)
self._test_cpu_percent(sum(new), last, new)
last = new
def test_per_cpu_times_percent(self):
last = psutil.cpu_times_percent(interval=0.001, percpu=True)
self.assertEqual(len(last), psutil.cpu_count())
for x in range(100):
new = psutil.cpu_times_percent(interval=None, percpu=True)
for cpu in new:
for percent in cpu:
self._test_cpu_percent(percent, last, new)
self._test_cpu_percent(sum(cpu), last, new)
last = new
def test_per_cpu_times_percent_negative(self):
# see: https://github.com/giampaolo/psutil/issues/645
psutil.cpu_times_percent(percpu=True)
zero_times = [x._make([0 for x in range(len(x._fields))])
for x in psutil.cpu_times(percpu=True)]
with mock.patch('psutil.cpu_times', return_value=zero_times):
for cpu in psutil.cpu_times_percent(percpu=True):
for percent in cpu:
self._test_cpu_percent(percent, None, None)
def run(self, *unused):
results = {}
data = psutil.cpu_times_percent(interval=1, percpu=True)
cpu_number = -1
for cpu in data:
core = {}
cpu_number = cpu_number+1
results[cpu_number] = {}
for key in cpu._fields:
core[key] = getattr(cpu, key)
results[cpu_number] = core
return results
def value(self):
return psutil.cpu_times_percent()
def test_cpu_times_percent(self):
last = psutil.cpu_times_percent(interval=0.001)
for x in range(100):
new = psutil.cpu_times_percent(interval=None)
for percent in new:
self._test_cpu_percent(percent, last, new)
self._test_cpu_percent(sum(new), last, new)
last = new
def test_per_cpu_times_percent(self):
last = psutil.cpu_times_percent(interval=0.001, percpu=True)
self.assertEqual(len(last), psutil.cpu_count())
for x in range(100):
new = psutil.cpu_times_percent(interval=None, percpu=True)
for cpu in new:
for percent in cpu:
self._test_cpu_percent(percent, last, new)
self._test_cpu_percent(sum(cpu), last, new)
last = new
def test_per_cpu_times_percent_negative(self):
# see: https://github.com/giampaolo/psutil/issues/645
psutil.cpu_times_percent(percpu=True)
zero_times = [x._make([0 for x in range(len(x._fields))])
for x in psutil.cpu_times(percpu=True)]
with mock.patch('psutil.cpu_times', return_value=zero_times):
for cpu in psutil.cpu_times_percent(percpu=True):
for percent in cpu:
self._test_cpu_percent(percent, None, None)
def test_cpu_times_percent(self):
last = psutil.cpu_times_percent(interval=0.001)
for x in range(100):
new = psutil.cpu_times_percent(interval=None)
for percent in new:
self._test_cpu_percent(percent, last, new)
self._test_cpu_percent(sum(new), last, new)
last = new
def test_per_cpu_times_percent(self):
last = psutil.cpu_times_percent(interval=0.001, percpu=True)
self.assertEqual(len(last), psutil.cpu_count())
for x in range(100):
new = psutil.cpu_times_percent(interval=None, percpu=True)
for cpu in new:
for percent in cpu:
self._test_cpu_percent(percent, last, new)
self._test_cpu_percent(sum(cpu), last, new)
last = new
def test_per_cpu_times_percent_negative(self):
# see: https://github.com/giampaolo/psutil/issues/645
psutil.cpu_times_percent(percpu=True)
zero_times = [x._make([0 for x in range(len(x._fields))])
for x in psutil.cpu_times(percpu=True)]
with mock.patch('psutil.cpu_times', return_value=zero_times):
for cpu in psutil.cpu_times_percent(percpu=True):
for percent in cpu:
self._test_cpu_percent(percent, None, None)
def get_sys_cpu():
sys_cpu = {}
cpu_time = psutil.cpu_times_percent(interval=1)
sys_cpu['percent'] = psutil.cpu_percent(interval=1)
sys_cpu['lcpu_percent'] = psutil.cpu_percent(interval=1, percpu=True)
sys_cpu['user'] = cpu_time.user
sys_cpu['nice'] = cpu_time.nice
sys_cpu['system'] = cpu_time.system
sys_cpu['idle'] = cpu_time.idle
sys_cpu['iowait'] = cpu_time.iowait
sys_cpu['irq'] = cpu_time.irq
sys_cpu['softirq'] = cpu_time.softirq
sys_cpu['guest'] = cpu_time.guest
return sys_cpu
def test_no_procfs_on_import(self, tb):
my_procfs = tempfile.mkdtemp()
with open(os.path.join(my_procfs, 'stat'), 'w') as f:
f.write('cpu 0 0 0 0 0 0 0 0 0 0\n')
f.write('cpu0 0 0 0 0 0 0 0 0 0 0\n')
f.write('cpu1 0 0 0 0 0 0 0 0 0 0\n')
try:
orig_open = open
def open_mock(name, *args, **kwargs):
if name.startswith('/proc'):
raise IOError(errno.ENOENT, 'rejecting access for test')
return orig_open(name, *args, **kwargs)
patch_point = 'builtins.open' if PY3 else '__builtin__.open'
with mock.patch(patch_point, side_effect=open_mock):
importlib.reload(psutil)
assert tb.called
self.assertRaises(IOError, psutil.cpu_times)
self.assertRaises(IOError, psutil.cpu_times, percpu=True)
self.assertRaises(IOError, psutil.cpu_percent)
self.assertRaises(IOError, psutil.cpu_percent, percpu=True)
self.assertRaises(IOError, psutil.cpu_times_percent)
self.assertRaises(
IOError, psutil.cpu_times_percent, percpu=True)
psutil.PROCFS_PATH = my_procfs
self.assertEqual(psutil.cpu_percent(), 0)
self.assertEqual(sum(psutil.cpu_times_percent()), 0)
# since we don't know the number of CPUs at import time,
# we awkwardly say there are none until the second call
per_cpu_percent = psutil.cpu_percent(percpu=True)
self.assertEqual(sum(per_cpu_percent), 0)
# ditto awkward length
per_cpu_times_percent = psutil.cpu_times_percent(percpu=True)
self.assertEqual(sum(map(sum, per_cpu_times_percent)), 0)
# much user, very busy
with open(os.path.join(my_procfs, 'stat'), 'w') as f:
f.write('cpu 1 0 0 0 0 0 0 0 0 0\n')
f.write('cpu0 1 0 0 0 0 0 0 0 0 0\n')
f.write('cpu1 1 0 0 0 0 0 0 0 0 0\n')
self.assertNotEqual(psutil.cpu_percent(), 0)
self.assertNotEqual(
sum(psutil.cpu_percent(percpu=True)), 0)
self.assertNotEqual(sum(psutil.cpu_times_percent()), 0)
self.assertNotEqual(
sum(map(sum, psutil.cpu_times_percent(percpu=True))), 0)
finally:
shutil.rmtree(my_procfs)
importlib.reload(psutil)
self.assertEqual(psutil.PROCFS_PATH, '/proc')
def test_no_procfs_on_import(self, tb):
my_procfs = tempfile.mkdtemp()
with open(os.path.join(my_procfs, 'stat'), 'w') as f:
f.write('cpu 0 0 0 0 0 0 0 0 0 0\n')
f.write('cpu0 0 0 0 0 0 0 0 0 0 0\n')
f.write('cpu1 0 0 0 0 0 0 0 0 0 0\n')
try:
orig_open = open
def open_mock(name, *args, **kwargs):
if name.startswith('/proc'):
raise IOError(errno.ENOENT, 'rejecting access for test')
return orig_open(name, *args, **kwargs)
patch_point = 'builtins.open' if PY3 else '__builtin__.open'
with mock.patch(patch_point, side_effect=open_mock):
importlib.reload(psutil)
assert tb.called
self.assertRaises(IOError, psutil.cpu_times)
self.assertRaises(IOError, psutil.cpu_times, percpu=True)
self.assertRaises(IOError, psutil.cpu_percent)
self.assertRaises(IOError, psutil.cpu_percent, percpu=True)
self.assertRaises(IOError, psutil.cpu_times_percent)
self.assertRaises(
IOError, psutil.cpu_times_percent, percpu=True)
psutil.PROCFS_PATH = my_procfs
self.assertEqual(psutil.cpu_percent(), 0)
self.assertEqual(sum(psutil.cpu_times_percent()), 0)
# since we don't know the number of CPUs at import time,
# we awkwardly say there are none until the second call
per_cpu_percent = psutil.cpu_percent(percpu=True)
self.assertEqual(sum(per_cpu_percent), 0)
# ditto awkward length
per_cpu_times_percent = psutil.cpu_times_percent(percpu=True)
self.assertEqual(sum(map(sum, per_cpu_times_percent)), 0)
# much user, very busy
with open(os.path.join(my_procfs, 'stat'), 'w') as f:
f.write('cpu 1 0 0 0 0 0 0 0 0 0\n')
f.write('cpu0 1 0 0 0 0 0 0 0 0 0\n')
f.write('cpu1 1 0 0 0 0 0 0 0 0 0\n')
self.assertNotEqual(psutil.cpu_percent(), 0)
self.assertNotEqual(
sum(psutil.cpu_percent(percpu=True)), 0)
self.assertNotEqual(sum(psutil.cpu_times_percent()), 0)
self.assertNotEqual(
sum(map(sum, psutil.cpu_times_percent(percpu=True))), 0)
finally:
shutil.rmtree(my_procfs)
importlib.reload(psutil)
self.assertEqual(psutil.PROCFS_PATH, '/proc')
def test_no_procfs_on_import(self, tb):
my_procfs = tempfile.mkdtemp()
with open(os.path.join(my_procfs, 'stat'), 'w') as f:
f.write('cpu 0 0 0 0 0 0 0 0 0 0\n')
f.write('cpu0 0 0 0 0 0 0 0 0 0 0\n')
f.write('cpu1 0 0 0 0 0 0 0 0 0 0\n')
try:
orig_open = open
def open_mock(name, *args, **kwargs):
if name.startswith('/proc'):
raise IOError(errno.ENOENT, 'rejecting access for test')
return orig_open(name, *args, **kwargs)
patch_point = 'builtins.open' if PY3 else '__builtin__.open'
with mock.patch(patch_point, side_effect=open_mock):
importlib.reload(psutil)
assert tb.called
self.assertRaises(IOError, psutil.cpu_times)
self.assertRaises(IOError, psutil.cpu_times, percpu=True)
self.assertRaises(IOError, psutil.cpu_percent)
self.assertRaises(IOError, psutil.cpu_percent, percpu=True)
self.assertRaises(IOError, psutil.cpu_times_percent)
self.assertRaises(
IOError, psutil.cpu_times_percent, percpu=True)
psutil.PROCFS_PATH = my_procfs
self.assertEqual(psutil.cpu_percent(), 0)
self.assertEqual(sum(psutil.cpu_times_percent()), 0)
# since we don't know the number of CPUs at import time,
# we awkwardly say there are none until the second call
per_cpu_percent = psutil.cpu_percent(percpu=True)
self.assertEqual(sum(per_cpu_percent), 0)
# ditto awkward length
per_cpu_times_percent = psutil.cpu_times_percent(percpu=True)
self.assertEqual(sum(map(sum, per_cpu_times_percent)), 0)
# much user, very busy
with open(os.path.join(my_procfs, 'stat'), 'w') as f:
f.write('cpu 1 0 0 0 0 0 0 0 0 0\n')
f.write('cpu0 1 0 0 0 0 0 0 0 0 0\n')
f.write('cpu1 1 0 0 0 0 0 0 0 0 0\n')
self.assertNotEqual(psutil.cpu_percent(), 0)
self.assertNotEqual(
sum(psutil.cpu_percent(percpu=True)), 0)
self.assertNotEqual(sum(psutil.cpu_times_percent()), 0)
self.assertNotEqual(
sum(map(sum, psutil.cpu_times_percent(percpu=True))), 0)
finally:
shutil.rmtree(my_procfs)
importlib.reload(psutil)
self.assertEqual(psutil.PROCFS_PATH, '/proc')