def test_clock_monotonic(self):
a = time.clock_gettime(time.CLOCK_MONOTONIC)
b = time.clock_gettime(time.CLOCK_MONOTONIC)
self.assertLessEqual(a, b)
python类clock_gettime()的实例源码
def _monotonic_clock():
return time.clock_gettime(time.CLOCK_MONOTONIC)
def run(self):
loops = 0 # loop counter
input_not_ready = False # to control log spam
data_requested = self._frame_len # to keep the alignment intact
# capture the starting time
start_time = time.clock_gettime(time.CLOCK_MONOTONIC_RAW)
while not self._end.is_set():
# increment loop counter
loops += 1
# try to read a frame from the input -- should be there all the time
try:
data = os.read(self._pipe_fd, data_requested)
# so we apparently got some data, clear the flag and calculate things
input_not_ready = False
data_len = len(data)
if data_len != 0 and data_len != self._frame_len:
log.warning('AacProcessor: Got partial buffer of size {}'.format(data_len))
# call the callback
self._play(data)
# calculate requested size for the next iteration
data_requested -= data_len
if data_requested == 0:
data_requested = self._frame_len
except OSError as e:
if e.errno == errno.EAGAIN:
# prevent spamming the log with megabytes of text
if not input_not_ready:
log.error('AacProcessor: Buffer not ready')
input_not_ready = True
else:
raise
# calculate next transmission time
next_time = start_time + self._frame_period * loops
sleep_time = max(0, self._frame_period + (next_time - time.clock_gettime(time.CLOCK_MONOTONIC_RAW)))
time.sleep(sleep_time)
def test_clock_realtime(self):
time.clock_gettime(time.CLOCK_REALTIME)
def test_clock_monotonic(self):
a = time.clock_gettime(time.CLOCK_MONOTONIC)
b = time.clock_gettime(time.CLOCK_MONOTONIC)
self.assertLessEqual(a, b)
def test_clock_settime(self):
t = time.clock_gettime(time.CLOCK_REALTIME)
try:
time.clock_settime(time.CLOCK_REALTIME, t)
except PermissionError:
pass
if hasattr(time, 'CLOCK_MONOTONIC'):
self.assertRaises(OSError,
time.clock_settime, time.CLOCK_MONOTONIC, 0)
def test_monotonic_settime(self):
t1 = time.monotonic()
realtime = time.clock_gettime(time.CLOCK_REALTIME)
# jump backward with an offset of 1 hour
try:
time.clock_settime(time.CLOCK_REALTIME, realtime - 3600)
except PermissionError as err:
self.skipTest(err)
t2 = time.monotonic()
time.clock_settime(time.CLOCK_REALTIME, realtime)
# monotonic must not be affected by system clock updates
self.assertGreaterEqual(t2, t1)
def test_clock_realtime(self):
time.clock_gettime(time.CLOCK_REALTIME)
def test_clock_settime(self):
t = time.clock_gettime(time.CLOCK_REALTIME)
try:
time.clock_settime(time.CLOCK_REALTIME, t)
except PermissionError:
pass
if hasattr(time, 'CLOCK_MONOTONIC'):
self.assertRaises(OSError,
time.clock_settime, time.CLOCK_MONOTONIC, 0)
def test_monotonic_settime(self):
t1 = time.monotonic()
realtime = time.clock_gettime(time.CLOCK_REALTIME)
# jump backward with an offset of 1 hour
try:
time.clock_settime(time.CLOCK_REALTIME, realtime - 3600)
except PermissionError as err:
self.skipTest(err)
t2 = time.monotonic()
time.clock_settime(time.CLOCK_REALTIME, realtime)
# monotonic must not be affected by system clock updates
self.assertGreaterEqual(t2, t1)
def test_clock_realtime(self):
time.clock_gettime(time.CLOCK_REALTIME)
def test_clock_monotonic(self):
a = time.clock_gettime(time.CLOCK_MONOTONIC)
b = time.clock_gettime(time.CLOCK_MONOTONIC)
self.assertLessEqual(a, b)
def test_clock_settime(self):
t = time.clock_gettime(time.CLOCK_REALTIME)
try:
time.clock_settime(time.CLOCK_REALTIME, t)
except PermissionError:
pass
if hasattr(time, 'CLOCK_MONOTONIC'):
self.assertRaises(OSError,
time.clock_settime, time.CLOCK_MONOTONIC, 0)
def test_monotonic_settime(self):
t1 = time.monotonic()
realtime = time.clock_gettime(time.CLOCK_REALTIME)
# jump backward with an offset of 1 hour
try:
time.clock_settime(time.CLOCK_REALTIME, realtime - 3600)
except PermissionError as err:
self.skipTest(err)
t2 = time.monotonic()
time.clock_settime(time.CLOCK_REALTIME, realtime)
# monotonic must not be affected by system clock updates
self.assertGreaterEqual(t2, t1)
def main_loop(bar, inputs = None):
# TODO: remove eventinputs again?
#inputs += bar.widget.eventinputs()
if inputs == None:
inputs = global_inputs
global_update = True
def signal_quit(signal, frame):
quit_main_loop()
signal.signal(signal.SIGINT, signal_quit)
signal.signal(signal.SIGTERM, signal_quit)
# main loop
while not core.shutdown_requested() and bar.is_running():
now = time.clock_gettime(time.CLOCK_MONOTONIC)
if bar.widget.maybe_timeout(now):
global_update = True
data_ready = []
if global_update:
painter = bar.painter()
painter.widget(bar.widget)
data_ready = select.select(inputs,[],[], 0.00)[0]
if not data_ready:
#print("REDRAW: " + str(time.clock_gettime(time.CLOCK_MONOTONIC)))
painter.flush()
global_update = False
else:
pass
#print("more data already ready")
if not data_ready:
# wait for new data
next_timeout = now + 360 # wait for at most one hour until the next bar update
to = bar.widget.next_timeout()
if to != None:
next_timeout = min(next_timeout, to)
now = time.clock_gettime(time.CLOCK_MONOTONIC)
next_timeout -= now
next_timeout = max(next_timeout,0.1)
#print("next timeout = " + str(next_timeout))
data_ready = select.select(inputs,[],[], next_timeout)[0]
if core.shutdown_requested():
break
if not data_ready:
pass #print('timeout!')
else:
for x in data_ready:
x.process()
global_update = True
bar.proc.kill()
for i in inputs:
i.kill()
bar.proc.wait()
def memory_cpu_stats_middleware(get_response):
import time
import psutil
from collections import Counter
from django_statsd.clients import statsd
from .global_request import get_view_name
from django.conf import settings
mem_entries = (
'rss',
'shared_clean', 'shared_dirty',
'private_clean', 'private_dirty'
)
def summed(info):
res = dict.fromkeys(mem_entries, 0)
for path_info in info:
for name in mem_entries:
res[name] += getattr(path_info, name)
return res
def middleware(request):
global SAMPLE_COUNT
SAMPLE_COUNT += 1
if SAMPLE_COUNT >= settings.SAMPLE_RATE:
SAMPLE_COUNT = 0
cpu_before = time.clock_gettime(time.CLOCK_PROCESS_CPUTIME_ID)
mem_before = summed(psutil.Process().memory_maps())
try:
return get_response(request)
finally:
cpu_after = time.clock_gettime(time.CLOCK_PROCESS_CPUTIME_ID)
statsd.gauge(
'cpu.{}'.format(get_view_name()),
cpu_after - cpu_before)
mem_after = summed(psutil.Process().memory_maps())
mem_key_base = 'memory.{}.{{}}'.format(get_view_name())
for name, after in mem_after.items():
diff = after - mem_before[name]
statsd.gauge(mem_key_base.format(name) + '.total', after)
statsd.gauge(mem_key_base.format(name) + '.change', diff)
else:
return get_response(request)
return middleware