def __init__(
self,
callback,
timer=TIMER,
interval=INTERVAL_SECONDS,
timer_signal=TIMER_SIGNAL):
assert callable(callback), 'callback must be callable'
signal.signal(timer_signal, self.callback)
signal.setitimer(timer, interval, interval)
oldtimer, oldaction = None, None # cheating for now
self.oldaction = oldaction
self.oldtimer = oldtimer
self._callback = callback
python类setitimer()的实例源码
def execution_timeout(timeout):
def timed_out(signum, sigframe):
raise TimeoutError
delay, interval = signal.setitimer(signal.ITIMER_REAL, timeout, 0)
old_hdl = signal.signal(signal.SIGALRM, timed_out)
now = time.time()
try:
yield
finally:
# inner timeout must be smaller, or the timer event will be delayed
if delay:
elapsed = time.time() - now
delay = max(delay - elapsed, 0.000001)
else:
delay = 0
signal.setitimer(signal.ITIMER_REAL, delay, interval)
signal.signal(signal.SIGALRM, old_hdl)
def sig_vtalrm(self, *args):
self.hndl_called = True
if self.hndl_count > 3:
# it shouldn't be here, because it should have been disabled.
raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
"timer.")
elif self.hndl_count == 3:
# disable ITIMER_VIRTUAL, this function shouldn't be called anymore
signal.setitimer(signal.ITIMER_VIRTUAL, 0)
if support.verbose:
print("last SIGVTALRM handler call")
self.hndl_count += 1
if support.verbose:
print("SIGVTALRM handler invoked", args)
def test_itimer_prof(self):
self.itimer = signal.ITIMER_PROF
signal.signal(signal.SIGPROF, self.sig_prof)
signal.setitimer(self.itimer, 0.2, 0.2)
start_time = time.time()
while time.time() - start_time < 60.0:
# do some work
_ = pow(12345, 67890, 10000019)
if signal.getitimer(self.itimer) == (0.0, 0.0):
break # sig_prof handler stopped this itimer
else: # Issue 8424
self.skipTest("timeout: likely cause: machine too slow or load too "
"high")
# profiling itimer should be (0.0, 0.0) now
self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
# and the handler should have been called
self.assertEqual(self.hndl_called, True)
def sig_vtalrm(self, *args):
self.hndl_called = True
if self.hndl_count > 3:
# it shouldn't be here, because it should have been disabled.
raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
"timer.")
elif self.hndl_count == 3:
# disable ITIMER_VIRTUAL, this function shouldn't be called anymore
signal.setitimer(signal.ITIMER_VIRTUAL, 0)
if test_support.verbose:
print("last SIGVTALRM handler call")
self.hndl_count += 1
if test_support.verbose:
print("SIGVTALRM handler invoked", args)
def test_itimer_prof(self):
self.itimer = signal.ITIMER_PROF
signal.signal(signal.SIGPROF, self.sig_prof)
signal.setitimer(self.itimer, 0.2, 0.2)
start_time = time.time()
while time.time() - start_time < 60.0:
# do some work
_ = pow(12345, 67890, 10000019)
if signal.getitimer(self.itimer) == (0.0, 0.0):
break # sig_prof handler stopped this itimer
else: # Issue 8424
self.skipTest("timeout: likely cause: machine too slow or load too "
"high")
# profiling itimer should be (0.0, 0.0) now
self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
# and the handler should have been called
self.assertEqual(self.hndl_called, True)
def sig_vtalrm(self, *args):
self.hndl_called = True
if self.hndl_count > 3:
# it shouldn't be here, because it should have been disabled.
raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
"timer.")
elif self.hndl_count == 3:
# disable ITIMER_VIRTUAL, this function shouldn't be called anymore
signal.setitimer(signal.ITIMER_VIRTUAL, 0)
if test_support.verbose:
print("last SIGVTALRM handler call")
self.hndl_count += 1
if test_support.verbose:
print("SIGVTALRM handler invoked", args)
def test_itimer_prof(self):
self.itimer = signal.ITIMER_PROF
signal.signal(signal.SIGPROF, self.sig_prof)
signal.setitimer(self.itimer, 0.2, 0.2)
start_time = time.time()
while time.time() - start_time < 60.0:
# do some work
_ = pow(12345, 67890, 10000019)
if signal.getitimer(self.itimer) == (0.0, 0.0):
break # sig_prof handler stopped this itimer
else: # Issue 8424
self.skipTest("timeout: likely cause: machine too slow or load too "
"high")
# profiling itimer should be (0.0, 0.0) now
self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
# and the handler should have been called
self.assertEqual(self.hndl_called, True)
def test_itimer_prof(self):
self.itimer = signal.ITIMER_PROF
signal.signal(signal.SIGPROF, self.sig_prof)
signal.setitimer(self.itimer, 0.2, 0.2)
start_time = time.time()
while time.time() - start_time < 60.0:
# do some work
_ = pow(12345, 67890, 10000019)
if signal.getitimer(self.itimer) == (0.0, 0.0):
break # sig_prof handler stopped this itimer
else: # Issue 8424
self.skipTest("timeout: likely cause: machine too slow or load too "
"high")
# profiling itimer should be (0.0, 0.0) now
self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
# and the handler should have been called
self.assertEqual(self.hndl_called, True)
def set_blocking_signal_threshold(self, seconds, action):
"""Sends a signal if the ioloop is blocked for more than s seconds.
Pass seconds=None to disable. Requires python 2.6 on a unixy
platform.
The action parameter is a python signal handler. Read the
documentation for the python 'signal' module for more information.
If action is None, the process will be killed if it is blocked for
too long.
"""
if not hasattr(signal, "setitimer"):
logging.error("set_blocking_signal_threshold requires a signal module "
"with the setitimer method")
return
self._blocking_signal_threshold = seconds
if seconds is not None:
signal.signal(signal.SIGALRM,
action if action is not None else signal.SIG_DFL)
def set_blocking_signal_threshold(self, seconds, action):
"""Sends a signal if the ioloop is blocked for more than s seconds.
Pass seconds=None to disable. Requires python 2.6 on a unixy
platform.
The action parameter is a python signal handler. Read the
documentation for the python 'signal' module for more information.
If action is None, the process will be killed if it is blocked for
too long.
"""
if not hasattr(signal, "setitimer"):
logging.error("set_blocking_signal_threshold requires a signal module "
"with the setitimer method")
return
self._blocking_signal_threshold = seconds
if seconds is not None:
signal.signal(signal.SIGALRM,
action if action is not None else signal.SIG_DFL)
def sig_vtalrm(self, *args):
self.hndl_called = True
if self.hndl_count > 3:
# it shouldn't be here, because it should have been disabled.
raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
"timer.")
elif self.hndl_count == 3:
# disable ITIMER_VIRTUAL, this function shouldn't be called anymore
signal.setitimer(signal.ITIMER_VIRTUAL, 0)
if test_support.verbose:
print("last SIGVTALRM handler call")
self.hndl_count += 1
if test_support.verbose:
print("SIGVTALRM handler invoked", args)
def test_itimer_prof(self):
self.itimer = signal.ITIMER_PROF
signal.signal(signal.SIGPROF, self.sig_prof)
signal.setitimer(self.itimer, 0.2, 0.2)
start_time = time.time()
while time.time() - start_time < 60.0:
# do some work
_ = pow(12345, 67890, 10000019)
if signal.getitimer(self.itimer) == (0.0, 0.0):
break # sig_prof handler stopped this itimer
else: # Issue 8424
self.skipTest("timeout: likely cause: machine too slow or load too "
"high")
# profiling itimer should be (0.0, 0.0) now
self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
# and the handler should have been called
self.assertEqual(self.hndl_called, True)
def test_itimer_prof(self):
self.itimer = signal.ITIMER_PROF
signal.signal(signal.SIGPROF, self.sig_prof)
signal.setitimer(self.itimer, 0.2, 0.2)
start_time = time.time()
while time.time() - start_time < 60.0:
# do some work
_ = pow(12345, 67890, 10000019)
if signal.getitimer(self.itimer) == (0.0, 0.0):
break # sig_prof handler stopped this itimer
else: # Issue 8424
self.skipTest("timeout: likely cause: machine too slow or load too "
"high")
# profiling itimer should be (0.0, 0.0) now
self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
# and the handler should have been called
self.assertEqual(self.hndl_called, True)
def sig_vtalrm(self, *args):
self.hndl_called = True
if self.hndl_count > 3:
# it shouldn't be here, because it should have been disabled.
raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
"timer.")
elif self.hndl_count == 3:
# disable ITIMER_VIRTUAL, this function shouldn't be called anymore
signal.setitimer(signal.ITIMER_VIRTUAL, 0)
if test_support.verbose:
print("last SIGVTALRM handler call")
self.hndl_count += 1
if test_support.verbose:
print("SIGVTALRM handler invoked", args)
def test_itimer_prof(self):
self.itimer = signal.ITIMER_PROF
signal.signal(signal.SIGPROF, self.sig_prof)
signal.setitimer(self.itimer, 0.2, 0.2)
start_time = time.time()
while time.time() - start_time < 60.0:
# do some work
_ = pow(12345, 67890, 10000019)
if signal.getitimer(self.itimer) == (0.0, 0.0):
break # sig_prof handler stopped this itimer
else: # Issue 8424
self.skipTest("timeout: likely cause: machine too slow or load too "
"high")
# profiling itimer should be (0.0, 0.0) now
self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
# and the handler should have been called
self.assertEqual(self.hndl_called, True)
def test_itimer_prof(self):
self.itimer = signal.ITIMER_PROF
signal.signal(signal.SIGPROF, self.sig_prof)
signal.setitimer(self.itimer, 0.2, 0.2)
start_time = time.time()
while time.time() - start_time < 60.0:
# do some work
_ = pow(12345, 67890, 10000019)
if signal.getitimer(self.itimer) == (0.0, 0.0):
break # sig_prof handler stopped this itimer
else: # Issue 8424
self.skipTest("timeout: likely cause: machine too slow or load too "
"high")
# profiling itimer should be (0.0, 0.0) now
self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
# and the handler should have been called
self.assertEqual(self.hndl_called, True)
def destroy(self):
if not self.setup_done:
return
signal.setitimer(signal.ITIMER_REAL, 0)
signal.signal(signal.SIGALRM, self.prev_signal_handler)
def record(self, duration):
self.agent.log('Activating block profiler.')
signal.setitimer(signal.ITIMER_REAL, self.SAMPLING_RATE, self.SAMPLING_RATE)
time.sleep(duration)
signal.setitimer(signal.ITIMER_REAL, 0)
self.agent.log('Deactivating block profiler.')
self.profile_duration += duration
self.agent.log('Block profiler CPU overhead per activity second: {0} seconds'.format(self.profile._overhead / self.profile_duration))
def destroy(self):
if not self.setup_done:
return
signal.setitimer(signal.ITIMER_PROF, 0)
signal.signal(signal.SIGPROF, self.prev_signal_handler)
def record(self, duration):
if self.agent.config.is_profiling_disabled():
return
self.agent.log('Activating CPU profiler.')
signal.setitimer(signal.ITIMER_PROF, self.SAMPLING_RATE, self.SAMPLING_RATE)
time.sleep(duration)
signal.setitimer(signal.ITIMER_PROF, 0)
self.agent.log('Deactivating CPU profiler.')
self.profile_duration += duration
self.agent.log('CPU profiler CPU overhead per activity second: {0} seconds'.format(self.profile._overhead / self.profile_duration))
def set_blocking_signal_threshold(self, seconds, action):
if not hasattr(signal, "setitimer"):
gen_log.error("set_blocking_signal_threshold requires a signal module "
"with the setitimer method")
return
self._blocking_signal_threshold = seconds
if seconds is not None:
signal.signal(signal.SIGALRM,
action if action is not None else signal.SIG_DFL)
def set_blocking_signal_threshold(self, seconds, action):
if not hasattr(signal, "setitimer"):
gen_log.error("set_blocking_signal_threshold requires a signal module "
"with the setitimer method")
return
self._blocking_signal_threshold = seconds
if seconds is not None:
signal.signal(signal.SIGALRM,
action if action is not None else signal.SIG_DFL)
def alarm(self, t=None):
"""start a timer to fire only once
like signal.alarm, but with better resolution than integer seconds.
"""
if t is None:
t = self.signal_delay
self.timer_fired = False
self.orig_handler = signal.signal(signal.SIGALRM, self.stop_timer)
# signal_period ignored, since only one timer event is allowed to fire
signal.setitimer(signal.ITIMER_REAL, t, 1000)
def stop_timer(self, *args):
self.timer_fired = True
signal.setitimer(signal.ITIMER_REAL, 0, 0)
signal.signal(signal.SIGALRM, self.orig_handler)
def set_blocking_signal_threshold(self, seconds, action):
if not hasattr(signal, "setitimer"):
gen_log.error("set_blocking_signal_threshold requires a signal module "
"with the setitimer method")
return
self._blocking_signal_threshold = seconds
if seconds is not None:
signal.signal(signal.SIGALRM,
action if action is not None else signal.SIG_DFL)
def test_signal_handling_while_selecting(self):
# Test with a signal actually arriving during a select() call.
caught = 0
def my_handler():
nonlocal caught
caught += 1
self.loop.stop()
self.loop.add_signal_handler(signal.SIGALRM, my_handler)
signal.setitimer(signal.ITIMER_REAL, 0.01, 0) # Send SIGALRM once.
self.loop.run_forever()
self.assertEqual(caught, 1)
def test_signal_handling_args(self):
some_args = (42,)
caught = 0
def my_handler(*args):
nonlocal caught
caught += 1
self.assertEqual(args, some_args)
self.loop.add_signal_handler(signal.SIGALRM, my_handler, *some_args)
signal.setitimer(signal.ITIMER_REAL, 0.1, 0) # Send SIGALRM once.
self.loop.call_later(0.5, self.loop.stop)
self.loop.run_forever()
self.assertEqual(caught, 1)
def set_timeout():
def make_timeout(sec, callback):
def _callback(signum, frame):
signal.alarm(0)
callback()
signal.signal(signal.SIGALRM, _callback)
signal.setitimer(signal.ITIMER_REAL, sec)
yield make_timeout
def stop(self):
self._callback = None
if self.oldaction and callable(self.oldaction):
signal.signal(TIMER_SIGNAL, self.oldaction)
signal.setitimer(
TIMER_SIGNAL,
self.oldtimer[0],
self.oldtimer[1],
)
else:
signal.signal(TIMER_SIGNAL, signal.SIG_IGN)