def test_interrupt_main(self):
#Calling start_new_thread with a function that executes interrupt_main
# should raise KeyboardInterrupt upon completion.
def call_interrupt():
_thread.interrupt_main()
self.assertRaises(KeyboardInterrupt, _thread.start_new_thread,
call_interrupt, tuple())
python类interrupt_main()的实例源码
def test_interrupt_in_main(self):
# Make sure that if interrupt_main is called in main threat that
# KeyboardInterrupt is raised instantly.
self.assertRaises(KeyboardInterrupt, _thread.interrupt_main)
def test_interrupt_main(self):
#Calling start_new_thread with a function that executes interrupt_main
# should raise KeyboardInterrupt upon completion.
def call_interrupt():
_thread.interrupt_main()
self.assertRaises(KeyboardInterrupt, _thread.start_new_thread,
call_interrupt, tuple())
def test_interrupt_in_main(self):
# Make sure that if interrupt_main is called in main threat that
# KeyboardInterrupt is raised instantly.
self.assertRaises(KeyboardInterrupt, _thread.interrupt_main)
def test_interrupt_main(self):
#Calling start_new_thread with a function that executes interrupt_main
# should raise KeyboardInterrupt upon completion.
def call_interrupt():
_thread.interrupt_main()
self.assertRaises(KeyboardInterrupt, _thread.start_new_thread,
call_interrupt, tuple())
def test_interrupt_in_main(self):
# Make sure that if interrupt_main is called in main threat that
# KeyboardInterrupt is raised instantly.
self.assertRaises(KeyboardInterrupt, _thread.interrupt_main)
def test_interrupt_main(self):
#Calling start_new_thread with a function that executes interrupt_main
# should raise KeyboardInterrupt upon completion.
def call_interrupt():
_thread.interrupt_main()
self.assertRaises(KeyboardInterrupt, _thread.start_new_thread,
call_interrupt, tuple())
def test_interrupt_in_main(self):
# Make sure that if interrupt_main is called in main threat that
# KeyboardInterrupt is raised instantly.
self.assertRaises(KeyboardInterrupt, _thread.interrupt_main)
def time_limit(seconds, msg=''):
if sys.platform == 'win32':
# windows system does not have signal SIGALARM so we will
# have to use a timer approach, which does not work as well
# as the signal approach
def timeout_func():
#env.logger.error('Timed out for operation {}'.format(msg))
_thread.interrupt_main()
timer = threading.Timer(seconds, timeout_func)
timer.start()
try:
yield
except KeyboardInterrupt:
# important: KeyboardInterrupt does not interrupt time.sleep()
# because KeyboardInterrupt is handled by Python interpreter but
# time.sleep() calls a system function.
raise TimeoutException("Timed out for operation {}".format(msg))
finally:
# if the action ends in specified time, timer is canceled
timer.cancel()
else:
def signal_handler(signum, frame):
raise TimeoutException("Timed out for option {}".format(msg))
signal.signal(signal.SIGALRM, signal_handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)