def timeout(timeout_seconds):
def decorate(function):
message = "Timeout (%s sec) elapsed for test %s" % (timeout_seconds, function.__name__)
def handler(signum, frame):
raise TimeoutError(message)
def new_f(*args, **kwargs):
old = signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout_seconds)
try:
function_result = function(*args, **kwargs)
finally:
signal.signal(signal.SIGALRM, old)
signal.alarm(0)
return function_result
new_f.func_name = function.func_name
return new_f
return decorate
python类alarm()的实例源码
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
def decorator(func):
def _handle_timeout(signum, frame):
raise TimeoutError(error_message)
def wrapper(*args, **kwargs):
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
finally:
signal.alarm(0)
return result
return wraps(func)(wrapper)
return decorator
def __call__(self, *args, **keyArgs):
# If we have SIGALRM signal, use it to cause an exception if and
# when this function runs too long. Otherwise check the time taken
# after the method has returned, and throw an exception then.
if hasattr(signal, 'SIGALRM'):
old = signal.signal(signal.SIGALRM, self.handle_timeout)
signal.alarm(self.timeout)
try:
result = self.function(*args, **keyArgs)
finally:
signal.signal(signal.SIGALRM, old)
signal.alarm(0)
else:
startTime = time.time()
result = self.function(*args, **keyArgs)
timeElapsed = time.time() - startTime
if timeElapsed >= self.timeout:
self.handle_timeout(None, None)
return result
def send(self, commands):
responses = list()
try:
for command in to_list(commands):
signal.alarm(self._timeout)
self._history.append(str(command))
cmd = '%s\r' % str(command)
self.shell.sendall(cmd)
if self._timeout == 0:
return
responses.append(self.receive(command))
except socket.timeout:
raise ShellError("timeout trying to send command: %s" % cmd)
except socket.error:
exc = get_exception()
raise ShellError("problem sending command to host: %s" % to_native(exc))
return responses
def retry(func: Callable[[], T]) -> T:
""" Retry the function with 30 second timeouts until it works
- I've observed the getFirefoxDriver() without this freeze once (out of hundreds of runs...) so adding this
as a safety measure. """
for i in range(10):
if config.DEBUG and i > 0:
print("Retry #%s" % str(i))
def timeoutHandler(signum, frame):
raise TimeoutException("Timeout!")
signal.signal(signal.SIGALRM, timeoutHandler)
signal.alarm(delayTime)
try:
t = func()
signal.alarm(0)
return t
except TimeoutException:
pass
signal.alarm(0)
raise TimeoutException("Retried 10 times... Failed!")
def decorator(minutes=1, error_message=os.strerror(errno.ETIME)):
def dec(func):
def _handle_timeout(signum, frame):
msg = 'Timeout Error: %s' % (error_message)
add_test_note(msg)
raise TimeoutException(error_message)
def wrapper(*args, **kwargs):
if minutes > 0:
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(int(minutes * 60))
try:
result = func(*args, **kwargs)
finally:
signal.alarm(0)
return result
return wraps(func)(wrapper)
return dec
def js6_to_js5(code):
global INITIALISED, babel, babelPresetEs2015
if not INITIALISED:
import signal, warnings, time
warnings.warn('\nImporting babel.py for the first time - this can take some time. \nPlease note that currently Javascript 6 in Js2Py is unstable and slow. Use only for tiny scripts!')
from .babel import babel as _babel
babel = _babel.Object.babel
babelPresetEs2015 = _babel.Object.babelPresetEs2015
# very weird hack. Somehow this helps babel to initialise properly!
try:
babel.transform('warmup', {'presets': {}})
signal.alarm(2)
def kill_it(a,b): raise KeyboardInterrupt('Better work next time!')
signal.signal(signal.SIGALRM, kill_it)
babel.transform('stuckInALoop', {'presets': babelPresetEs2015}).code
for n in range(3):
time.sleep(1)
except:
print("Initialised babel!")
INITIALISED = True
return babel.transform(code, {'presets': babelPresetEs2015}).code
def test_select_interrupt(self):
s = self.SELECTOR()
self.addCleanup(s.close)
rd, wr = self.make_socketpair()
orig_alrm_handler = signal.signal(signal.SIGALRM, lambda *args: None)
self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
self.addCleanup(signal.alarm, 0)
signal.alarm(1)
s.register(rd, selectors.EVENT_READ)
t = time()
self.assertFalse(s.select(2))
self.assertLess(time() - t, 2.5)
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
def decorator(func):
def _handle_timeout(signum, frame):
raise TimeoutError(error_message)
def wrapper(*args, **kwargs):
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
finally:
signal.alarm(0)
return result
return wraps(func)(wrapper)
return decorator
# special test case for running ssh commands at module level
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
def decorator(func):
def _handle_timeout(signum, frame):
raise TimeoutError(error_message)
def wrapper(*args, **kwargs):
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
finally:
signal.alarm(0)
return result
return wraps(func)(wrapper)
return decorator
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
def decorator(func):
def _handle_timeout(signum, frame):
raise TimeoutError(error_message)
def wrapper(*args, **kwargs):
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
finally:
signal.alarm(0)
return result
return wraps(func)(wrapper)
return decorator
def run_watcher():
site = pywikibot.Site(user="Embedded Data Bot")
redis = Redis(host="tools-redis")
signal.signal(signal.SIGALRM, on_timeout)
signal.alarm(TIMEOUT)
rc = site_rc_listener(site)
for change in rc:
signal.alarm(TIMEOUT)
if (
change['type'] == 'log' and
change['namespace'] == 6 and
change['log_type'] == 'upload'
):
redis.rpush(REDIS_KEY, json.dumps(change))
pywikibot.output("Exit - THIS SHOULD NOT HAPPEN")
def timeout(seconds, error_message="function call time out"):
def decorated(func):
def _handle_timeout(signum, frame):
raise TimeoutError(error_message);
def wrapper(*args, **kwargs):
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
finally:
signal.alarm(10)
return result
return functools.wraps(func)(wrapper)
return decorated
#}}}
def timeout(seconds, error_message="function call time out"):
def decorated(func):
def _handle_timeout(signum, frame):
raise TimeoutError(error_message);
def wrapper(*args, **kwargs):
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
finally:
signal.alarm(10)
return result
return functools.wraps(func)(wrapper)
return decorated
#}}}
def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
import signal
class TimeoutError(Exception):
pass
def handler(signum, frame):
raise TimeoutError()
# set the timeout handler
signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout_duration)
try:
result = func(*args, **kwargs)
except TimeoutError as exc:
result = default
finally:
signal.alarm(0)
return result
def _on_signal_received(self, sig):
# Code below must not block to return to select.select() and catch
# next signals
if sig == _utils.SIGALRM:
self._alarm()
elif sig == signal.SIGTERM:
LOG.info('Caught SIGTERM signal, '
'graceful exiting of service %s' % self.title)
if self.service.graceful_shutdown_timeout > 0:
if os.name == "posix":
signal.alarm(self.service.graceful_shutdown_timeout)
else:
threading.Timer(self.service.graceful_shutdown_timeout,
self._alarm).start()
_utils.spawn(self.service._terminate)
elif sig == _utils.SIGHUP:
_utils.spawn(self.service._reload)
def timeout(seconds=10, error_message="Timer expired"):
def decorator(func):
def _handle_timeout(signum, frame):
raise TimeoutError(error_message)
def wrapper(*args, **kwargs):
if 'GATHER_TIMEOUT' in globals():
if GATHER_TIMEOUT:
seconds = GATHER_TIMEOUT
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
finally:
signal.alarm(0)
return result
return wrapper
return decorator
# --------------------------------------------------------------
def scan_DNS_zone(self, domain_name):
log.console_log("{}[*] Perfoming DNS Zone Scanning... {}".format(G, W))
log.console_log("{}[*] Please wait, maximum timeout for checking is 1 minutes {}".format(G, W))
signal.signal(signal.SIGALRM, self.timeLimitHandler)
signal.alarm(60)
try:
scan_list = str(list(Scanner(domain_name).scan()))
ns_record_list = []
mx_record_list = []
log.console_log("{}{}{}".format(G, scan_list.replace(",","\n"), W))
log.console_log("{}DNS Server:{}".format(G, W))
for ns in dns.resolver.query(domain_name, 'NS'):
log.console_log(G + ns.to_text() + W)
ns_record_list.append(ns.to_text())
log.console_log("{}MX Record:{}".format(G, W))
for ns in dns.resolver.query(domain_name, 'MX'):
log.console_log("{}{}{}".format(G, ns.to_text(), W))
mx_record_list.append(ns.to_text())
self.db.update_dns_zone(self.project_id, domain_name, util.clean_list_string(ns_record_list), util.clean_list_string(mx_record_list))
except Exception, exc:
print("{}[*] No response from server... SKIP!{}".format(R, W))
def test_rlock_acquire_interruption(self):
# Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
# in a deadlock.
# XXX this test can fail when the legacy (non-semaphore) implementation
# of locks is used in thread_pthread.h, see issue #11223.
oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
try:
rlock = thread.RLock()
# For reentrant locks, the initial acquisition must be in another
# thread.
def other_thread():
rlock.acquire()
thread.start_new_thread(other_thread, ())
# Wait until we can't acquire it without blocking...
while rlock.acquire(blocking=False):
rlock.release()
time.sleep(0.01)
signal.alarm(1)
t1 = time.time()
self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5)
dt = time.time() - t1
# See rationale above in test_lock_acquire_interruption
self.assertLess(dt, 3.0)
finally:
signal.signal(signal.SIGALRM, oldalrm)
def test_wakeup_fd_during(self):
self.check_wakeup("""def test():
import select
import time
TIMEOUT_FULL = 10
TIMEOUT_HALF = 5
signal.alarm(1)
before_time = time.time()
# We attempt to get a signal during the select call
try:
select.select([read], [], [], TIMEOUT_FULL)
except select.error:
pass
else:
raise Exception("select.error not raised")
after_time = time.time()
dt = after_time - before_time
if dt >= TIMEOUT_HALF:
raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
""")
def check_reentrant_write(self, data, **fdopen_kwargs):
def on_alarm(*args):
# Will be called reentrantly from the same thread
wio.write(data)
1/0
signal.signal(signal.SIGALRM, on_alarm)
r, w = os.pipe()
wio = self.io.open(w, **fdopen_kwargs)
try:
signal.alarm(1)
# Either the reentrant call to wio.write() fails with RuntimeError,
# or the signal handler raises ZeroDivisionError.
with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
while 1:
for i in range(100):
wio.write(data)
wio.flush()
# Make sure the buffer doesn't fill up and block further writes
os.read(r, len(data) * 100)
exc = cm.exception
if isinstance(exc, RuntimeError):
self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
finally:
wio.close()
os.close(r)
def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
"""Check that a buffered read, when it gets interrupted (either
returning a partial result or EINTR), properly invokes the signal
handler and retries if the latter returned successfully."""
r, w = os.pipe()
fdopen_kwargs["closefd"] = False
def alarm_handler(sig, frame):
os.write(w, b"bar")
signal.signal(signal.SIGALRM, alarm_handler)
try:
rio = self.io.open(r, **fdopen_kwargs)
os.write(w, b"foo")
signal.alarm(1)
# Expected behaviour:
# - first raw read() returns partial b"foo"
# - second raw read() returns EINTR
# - third raw read() returns b"bar"
self.assertEqual(decode(rio.read(6)), "foobar")
finally:
rio.close()
os.close(w)
os.close(r)
def test_communicate_eintr(self):
# Issue #12493: communicate() should handle EINTR
def handler(signum, frame):
pass
old_handler = signal.signal(signal.SIGALRM, handler)
self.addCleanup(signal.signal, signal.SIGALRM, old_handler)
# the process is running for 2 seconds
args = [sys.executable, "-c", 'import time; time.sleep(2)']
for stream in ('stdout', 'stderr'):
kw = {stream: subprocess.PIPE}
with subprocess.Popen(args, **kw) as process:
signal.alarm(1)
# communicate() will be interrupted by SIGALRM
process.communicate()
# context manager
def timeout(seconds):
"""
Implement timeout
:param seconds: timeout in seconds
:type seconds: int
"""
def timeout_handler(signum, frame):
"""Function to call on a timeout event"""
if signum or frame:
pass
original_handler = signal.signal(signal.SIGALRM, timeout_handler)
try:
signal.alarm(seconds)
yield
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, original_handler)
def connection_made(self, transport):
logger.info('gmy=>connection_made')
self.transport = transport
# Building client packet to send to SFF
packet = build_nsh_header(self.encapsulate_header_values,
self.base_header_values,
self.ctx_header_values)
udp_inner_packet = build_udp_packet(self.inner_header.inner_src_ip, self.inner_header.inner_dest_ip,
self.inner_header.inner_src_port,
self.inner_header.inner_dest_port, "test".encode('utf-8'))
logger.info("Sending %s packet to SFF: %s", self.encapsulate_type, (self.remote_sff_ip, self.remote_sff_port))
logger.info("Packet dump: %s", binascii.hexlify(packet))
# Send the packet
signal.signal(signal.SIGALRM, self.alarm_handler)
signal.alarm(2)
try:
self.transport.sendto(packet + udp_inner_packet, (self.remote_sff_ip, self.remote_sff_port))
except socket.error as msg:
print('Failed to send packet. Error Code : ' + str(msg))
sys.exit()
except Exception as e:
logger.error("Error processing client: %s" % str(e))
def connection_made(self, transport):
self.transport = transport
# Building client dummy IP packet to send to SFF
packet = build_nsh_eth_header(self.encapsulate_header_values,
self.base_header_values,
self.ctx_header_values,
self.ethernet_values)
# packet = build_vxlan_header(self.encapsulate_header_values,
# self.ethernet_values)
udp_inner_packet = build_udp_packet(self.inner_header.inner_src_ip, self.inner_header.inner_dest_ip,
self.inner_header.inner_src_port,
self.inner_header.inner_dest_port, "test".encode('utf-8'))
gpe_nsh_ethernet_packet = packet + udp_inner_packet
logger.debug("Ethernet dump: ", binascii.hexlify(gpe_nsh_ethernet_packet))
logger.info("Sending %s packet to SFF: %s", self.encapsulate_type, (self.remote_sff_ip, self.remote_sff_port))
# Send the packet
signal.signal(signal.SIGALRM, self.alarm_handler)
signal.alarm(2)
try:
self.transport.sendto(gpe_nsh_ethernet_packet, (self.remote_sff_ip, self.remote_sff_port))
except socket.error as msg:
print('Failed to send packet. Error Code : ' + str(msg))
sys.exit()
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
def decorator(func):
def _handle_timeout(signum, frame):
raise TimeoutError(error_message)
def wrapper(*args, **kwargs):
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
finally:
signal.alarm(0)
return result
return wraps(func)(wrapper)
return decorator
def test_signals(self):
signalled_all.acquire()
self.spawnSignallingThread()
signalled_all.acquire()
# the signals that we asked the kernel to send
# will come back, but we don't know when.
# (it might even be after the thread exits
# and might be out of order.) If we haven't seen
# the signals yet, send yet another signal and
# wait for it return.
if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
signal.alarm(1)
signal.pause()
signal.alarm(0)
self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
thread.get_ident())
self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
thread.get_ident())
signalled_all.release()
def check_reentrant_write(self, data, **fdopen_kwargs):
def on_alarm(*args):
# Will be called reentrantly from the same thread
wio.write(data)
1//0
signal.signal(signal.SIGALRM, on_alarm)
r, w = os.pipe()
wio = self.io.open(w, **fdopen_kwargs)
try:
signal.alarm(1)
# Either the reentrant call to wio.write() fails with RuntimeError,
# or the signal handler raises ZeroDivisionError.
with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
while 1:
for i in range(100):
wio.write(data)
wio.flush()
# Make sure the buffer doesn't fill up and block further writes
os.read(r, len(data) * 100)
exc = cm.exception
if isinstance(exc, RuntimeError):
self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
finally:
wio.close()
os.close(r)
def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
"""Check that a buffered read, when it gets interrupted (either
returning a partial result or EINTR), properly invokes the signal
handler and retries if the latter returned successfully."""
r, w = os.pipe()
fdopen_kwargs["closefd"] = False
def alarm_handler(sig, frame):
os.write(w, b"bar")
signal.signal(signal.SIGALRM, alarm_handler)
try:
rio = self.io.open(r, **fdopen_kwargs)
os.write(w, b"foo")
signal.alarm(1)
# Expected behaviour:
# - first raw read() returns partial b"foo"
# - second raw read() returns EINTR
# - third raw read() returns b"bar"
self.assertEqual(decode(rio.read(6)), "foobar")
finally:
rio.close()
os.close(w)
os.close(r)