def cpu_times(percpu=False):
"""Return system-wide CPU times as a namedtuple.
Every CPU time represents the seconds the CPU has spent in the given mode.
The namedtuple's fields availability varies depending on the platform:
- user
- system
- idle
- nice (UNIX)
- iowait (Linux)
- irq (Linux, FreeBSD)
- softirq (Linux)
- steal (Linux >= 2.6.11)
- guest (Linux >= 2.6.24)
- guest_nice (Linux >= 3.2.0)
When percpu is True return a list of namedtuples for each CPU.
First element of the list refers to first CPU, second element
to second CPU and so on.
The order of the list is consistent across calls.
"""
if not percpu:
return _psplatform.cpu_times()
else:
return _psplatform.per_cpu_times()
python类times()的实例源码
def cpu_times(percpu=False):
"""Return system-wide CPU times as a namedtuple.
Every CPU time represents the seconds the CPU has spent in the given mode.
The namedtuple's fields availability varies depending on the platform:
- user
- system
- idle
- nice (UNIX)
- iowait (Linux)
- irq (Linux, FreeBSD)
- softirq (Linux)
- steal (Linux >= 2.6.11)
- guest (Linux >= 2.6.24)
- guest_nice (Linux >= 3.2.0)
When percpu is True return a list of namedtuples for each CPU.
First element of the list refers to first CPU, second element
to second CPU and so on.
The order of the list is consistent across calls.
"""
if not percpu:
return _psplatform.cpu_times()
else:
return _psplatform.per_cpu_times()
def test_cmdline(self):
cmdline = [PYTHON, "-c", "import time; time.sleep(60)"]
sproc = get_test_subprocess(cmdline)
try:
self.assertEqual(' '.join(psutil.Process(sproc.pid).cmdline()),
' '.join(cmdline))
except AssertionError:
# XXX - most of the times the underlying sysctl() call on Net
# and Open BSD returns a truncated string.
# Also /proc/pid/cmdline behaves the same so it looks
# like this is a kernel bug.
if NETBSD or OPENBSD:
self.assertEqual(
psutil.Process(sproc.pid).cmdline()[0], PYTHON)
else:
raise
def cpu_times(percpu=False):
"""Return system-wide CPU times as a namedtuple.
Every CPU time represents the seconds the CPU has spent in the given mode.
The namedtuple's fields availability varies depending on the platform:
- user
- system
- idle
- nice (UNIX)
- iowait (Linux)
- irq (Linux, FreeBSD)
- softirq (Linux)
- steal (Linux >= 2.6.11)
- guest (Linux >= 2.6.24)
- guest_nice (Linux >= 3.2.0)
When percpu is True return a list of namedtuples for each CPU.
First element of the list refers to first CPU, second element
to second CPU and so on.
The order of the list is consistent across calls.
"""
if not percpu:
return _psplatform.cpu_times()
else:
return _psplatform.per_cpu_times()
def _cpu_tot_time(times):
"""Given a cpu_time() ntuple calculates the total CPU time
(including idle time).
"""
tot = sum(times)
if LINUX:
# On Linux guest times are already accounted in "user" or
# "nice" times, so we subtract them from total.
# Htop does the same. References:
# https://github.com/giampaolo/psutil/pull/940
# http://unix.stackexchange.com/questions/178045
# https://github.com/torvalds/linux/blob/
# 447976ef4fd09b1be88b316d1a81553f1aa7cd07/kernel/sched/
# cputime.c#L158
tot -= getattr(times, "guest", 0) # Linux 2.6.24+
tot -= getattr(times, "guest_nice", 0) # Linux 3.2.0+
return tot
def cpu_times(percpu=False):
"""Return system-wide CPU times as a namedtuple.
Every CPU time represents the seconds the CPU has spent in the given mode.
The namedtuple's fields availability varies depending on the platform:
- user
- system
- idle
- nice (UNIX)
- iowait (Linux)
- irq (Linux, FreeBSD)
- softirq (Linux)
- steal (Linux >= 2.6.11)
- guest (Linux >= 2.6.24)
- guest_nice (Linux >= 3.2.0)
When percpu is True return a list of namedtuples for each CPU.
First element of the list refers to first CPU, second element
to second CPU and so on.
The order of the list is consistent across calls.
"""
if not percpu:
return _psplatform.cpu_times()
else:
return _psplatform.per_cpu_times()
def test_cmdline(self):
cmdline = [PYTHON, "-c", "import time; time.sleep(60)"]
sproc = get_test_subprocess(cmdline)
try:
self.assertEqual(' '.join(psutil.Process(sproc.pid).cmdline()),
' '.join(cmdline))
except AssertionError:
# XXX - most of the times the underlying sysctl() call on Net
# and Open BSD returns a truncated string.
# Also /proc/pid/cmdline behaves the same so it looks
# like this is a kernel bug.
if NETBSD or OPENBSD:
self.assertEqual(
psutil.Process(sproc.pid).cmdline()[0], PYTHON)
else:
raise
def _cpu_tot_time(times):
"""Given a cpu_time() ntuple calculates the total CPU time
(including idle time).
"""
tot = sum(times)
if LINUX:
# On Linux guest times are already accounted in "user" or
# "nice" times, so we subtract them from total.
# Htop does the same. References:
# https://github.com/giampaolo/psutil/pull/940
# http://unix.stackexchange.com/questions/178045
# https://github.com/torvalds/linux/blob/
# 447976ef4fd09b1be88b316d1a81553f1aa7cd07/kernel/sched/
# cputime.c#L158
tot -= getattr(times, "guest", 0) # Linux 2.6.24+
tot -= getattr(times, "guest_nice", 0) # Linux 3.2.0+
return tot
def test_cmdline(self):
cmdline = [PYTHON, "-c", "import time; time.sleep(60)"]
sproc = get_test_subprocess(cmdline)
try:
self.assertEqual(' '.join(psutil.Process(sproc.pid).cmdline()),
' '.join(cmdline))
except AssertionError:
# XXX - most of the times the underlying sysctl() call on Net
# and Open BSD returns a truncated string.
# Also /proc/pid/cmdline behaves the same so it looks
# like this is a kernel bug.
if NETBSD or OPENBSD:
self.assertEqual(
psutil.Process(sproc.pid).cmdline()[0], PYTHON)
else:
raise
def _cpu_tot_time(times):
"""Given a cpu_time() ntuple calculates the total CPU time
(including idle time).
"""
tot = sum(times)
if LINUX:
# On Linux guest times are already accounted in "user" or
# "nice" times, so we subtract them from total.
# Htop does the same. References:
# https://github.com/giampaolo/psutil/pull/940
# http://unix.stackexchange.com/questions/178045
# https://github.com/torvalds/linux/blob/
# 447976ef4fd09b1be88b316d1a81553f1aa7cd07/kernel/sched/
# cputime.c#L158
tot -= getattr(times, "guest", 0) # Linux 2.6.24+
tot -= getattr(times, "guest_nice", 0) # Linux 3.2.0+
return tot
def search(self):
'''Top level search.
'''
logger.debug("Start searching...")
logger.debug(self.get_info())
self.logwriter = file_writers.LogWriter('%s/%s.log'%(self.out_dir.rstrip('/'), self.data_handle.data_list[0].filename.split('/')[-1].replace('.h5','').replace('.fits','').replace('.fil','')))
self.filewriter = file_writers.FileWriter('%s/%s.dat'%(self.out_dir.rstrip('/'), self.data_handle.data_list[0].filename.split('/')[-1].replace('.h5','').replace('.fits','').replace('.fil','')),self.data_handle.data_list[0].header)
logger.info("Start ET search for %s"%self.data_handle.data_list[0].filename)
self.logwriter.info("Start ET search for %s"%(self.data_handle.data_list[0].filename))
for ii,target_data_obj in enumerate(self.data_handle.data_list):
self.search_data(target_data_obj)
##EE-benshmark cProfile.runctx('self.search_data(target_data_obj)',globals(),locals(),filename='profile_M%2.1f_S%2.1f_t%i'%(self.max_drift,self.snr,int(os.times()[-1])))
#----------------------------------------
#Closing instance. Collect garbage.
self.data_handle.data_list[ii].close()
gc.collect()
def t(s, sinceBegin=False):
"""Pretty-print timing information."""
global CTIME, CTIMES
nt = int(time.time())
ntimes = os.times()
if not sinceBegin:
ct = CTIME
cts = CTIMES
else:
ct = BEGIN_TIME
cts = BEGIN_TIMES
print '# TIME', s, \
': %dmin, %dsec (wall) %dmin, %dsec (user) %dmin, %dsec (system)' \
% _minSec(nt-ct, ntimes[0]-cts[0], ntimes[1]-cts[1])
if not sinceBegin:
CTIME = nt
CTIMES = ntimes
def _get_time_times(timer=os.times):
t = timer()
return t[0] + t[1]
# Using getrusage(3) is better than clock(3) if available:
# on some systems (e.g. FreeBSD), getrusage has a higher resolution
# Furthermore, on a POSIX system, returns microseconds, which
# wrap around after 36min.
def changeCycle(self):
"""Wallpaper change cycle."""
uold, sold, cold, c, e = os.times()
while True:
delta = self.deltaTime()
if delta >= self.timeout:
self.change_next()
self.time = time.time()
unew, snew, cnew, c, e = os.times()
start = time.time()
percentage = get_percentage(unew, uold, start)
if percentage > 30.0:
time.sleep(0.1)
def _get_time_times(timer=os.times):
t = timer()
return t[0] + t[1]
# Using getrusage(3) is better than clock(3) if available:
# on some systems (e.g. FreeBSD), getrusage has a higher resolution
# Furthermore, on a POSIX system, returns microseconds, which
# wrap around after 36min.
def threads(self):
"""Return threads opened by process as a list of
(id, user_time, system_time) namedtuples representing
thread id and thread CPU times (user/system).
On OpenBSD this method requires root access.
"""
return self._proc.threads()
def cpu_times(self):
"""Return a (user, system, children_user, children_system)
namedtuple representing the accumulated process time, in
seconds.
This is similar to os.times() but per-process.
On OSX and Windows children_user and children_system are
always set to 0.
"""
return self._proc.cpu_times()
def threads(self):
"""Return threads opened by process as a list of
(id, user_time, system_time) namedtuples representing
thread id and thread CPU times (user/system).
On OpenBSD this method requires root access.
"""
return self._proc.threads()
def cpu_times(self):
"""Return a (user, system, children_user, children_system)
namedtuple representing the accumulated process time, in
seconds.
This is similar to os.times() but per-process.
On OSX and Windows children_user and children_system are
always set to 0.
"""
return self._proc.cpu_times()
def threads(self):
"""Return threads opened by process as a list of
(id, user_time, system_time) namedtuples representing
thread id and thread CPU times (user/system).
On OpenBSD this method requires root access.
"""
return self._proc.threads()
def cpu_times(self):
"""Return a (user, system, children_user, children_system)
namedtuple representing the accumulated process time, in
seconds.
This is similar to os.times() but per-process.
On OSX and Windows children_user and children_system are
always set to 0.
"""
return self._proc.cpu_times()
def threads(self):
"""Return threads opened by process as a list of
(id, user_time, system_time) namedtuples representing
thread id and thread CPU times (user/system).
On OpenBSD this method requires root access.
"""
return self._proc.threads()
def cpu_times(self):
"""Return a (user, system, children_user, children_system)
namedtuple representing the accumulated process time, in
seconds.
This is similar to os.times() but per-process.
On OSX and Windows children_user and children_system are
always set to 0.
"""
return self._proc.cpu_times()
def _monotonic_time():
return os.times()[4]
def _get_time_times(timer=os.times):
t = timer()
return t[0] + t[1]
# Using getrusage(3) is better than clock(3) if available:
# on some systems (e.g. FreeBSD), getrusage has a higher resolution
# Furthermore, on a POSIX system, returns microseconds, which
# wrap around after 36min.
def test_cpu_times(self):
times = psutil.Process().cpu_times()
assert (times.user > 0.0) or (times.system > 0.0), times
assert (times.children_user >= 0.0), times
assert (times.children_system >= 0.0), times
# make sure returned values can be pretty printed with strftime
for name in times._fields:
time.strftime("%H:%M:%S", time.localtime(getattr(times, name)))
# Test Process.cpu_times() against os.times()
# os.times() is broken on Python 2.6
# http://bugs.python.org/issue1040026
# XXX fails on OSX: not sure if it's for os.times(). We should
# try this with Python 2.7 and re-enable the test.
def test_cpu_times_2(self):
user_time, kernel_time = psutil.Process().cpu_times()[:2]
utime, ktime = os.times()[:2]
# Use os.times()[:2] as base values to compare our results
# using a tolerance of +/- 0.1 seconds.
# It will fail if the difference between the values is > 0.1s.
if (max([user_time, utime]) - min([user_time, utime])) > 0.1:
self.fail("expected: %s, found: %s" % (utime, user_time))
if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1:
self.fail("expected: %s, found: %s" % (ktime, kernel_time))
def threads(self):
"""Return threads opened by process as a list of
(id, user_time, system_time) namedtuples representing
thread id and thread CPU times (user/system).
On OpenBSD this method requires root access.
"""
return self._proc.threads()
def cpu_times(self):
"""Return a (user, system, children_user, children_system)
namedtuple representing the accumulated process time, in
seconds.
This is similar to os.times() but per-process.
On OSX and Windows children_user and children_system are
always set to 0.
"""
return self._proc.cpu_times()
def initStats(self):
sNode.n = 0
StateSpace.n = 1 #initial state already generated on call so search
self.total_search_time = 0
self.cycle_check_pruned = 0
self.total_search_time = os.times()[0]