def finish(self):
"""
Restore the original SIGINT handler after finishing.
This should happen regardless of whether the progress display finishes
normally, or gets interrupted.
"""
super(InterruptibleMixin, self).finish()
signal(SIGINT, self.original_handler)
python类SIGINT的实例源码
def handle_sigint(self, signum, frame):
"""
Call self.finish() before delegating to the original SIGINT handler.
This handler should only be in place while the progress display is
active.
"""
self.finish()
self.original_handler(signum, frame)
def interruption():
"""
Ignore SIGINT/SIGTERM in kas, let them be handled by our sub-processes
"""
pass
def __init__(self, *args, **kwargs):
super(SigIntMixin, self).__init__(*args, **kwargs)
signal(SIGINT, self._sigint_handler)
def __init__(self, *args, **kwargs):
"""
Save the original SIGINT handler for later.
"""
super(InterruptibleMixin, self).__init__(*args, **kwargs)
self.original_handler = signal(SIGINT, self.handle_sigint)
# If signal() returns None, the previous handler was not installed from
# Python, and we cannot restore it. This probably should not happen,
# but if it does, we must restore something sensible instead, at least.
# The least bad option should be Python's default SIGINT handler, which
# just raises KeyboardInterrupt.
if self.original_handler is None:
self.original_handler = default_int_handler
def finish(self):
"""
Restore the original SIGINT handler after finishing.
This should happen regardless of whether the progress display finishes
normally, or gets interrupted.
"""
super(InterruptibleMixin, self).finish()
signal(SIGINT, self.original_handler)
def handle_sigint(self, signum, frame):
"""
Call self.finish() before delegating to the original SIGINT handler.
This handler should only be in place while the progress display is
active.
"""
self.finish()
self.original_handler(signum, frame)
def stop(self):
os.kill(self.writer_proc.pid, signal.SIGINT)
def test_RogueService(self):
devmgr_nb, devMgr = self.launchDeviceManager("/nodes/test_BasicTestDevice_node/DeviceManager.dcd.xml")
import ossie.utils.popen as _popen
from ossie.utils import redhawk
rhdom= redhawk.attach(scatest.getTestDomainName())
serviceName = "fake_1"
args = []
args.append("sdr/dev/services/fake/python/fake.py")
args.append("DEVICE_MGR_IOR")
args.append(self._orb.object_to_string(devMgr))
args.append("SERVICE_NAME")
args.append(serviceName)
exec_file = "sdr/dev/services/fake/python/fake.py"
external_process = _popen.Popen(args, executable=exec_file, cwd=os.getcwd(), preexec_fn=os.setpgrp)
time.sleep(2)
names=[serviceName]
for svc in devMgr._get_registeredServices():
self.assertNotEqual(svc, None)
self.assertEqual(svc.serviceName in names, True)
for svc in rhdom.services:
self.assertNotEqual(svc, None)
self.assertEqual(svc._instanceName in names, True)
# Kill the external services
os.kill(external_process.pid, signal.SIGINT)
time.sleep(1)
# check rogue service is removed
self.assertEquals(len(devMgr._get_registeredServices()), 0)
self.assertEquals(len(rhdom.services), 0)
def test_EventAppPortConnectionSIGINT(self):
self.localEvent = threading.Event()
self.eventFlag = False
self._nb_domMgr, domMgr = self.launchDomainManager(endpoint="giop:tcp::5679", dbURI=self._dbfile)
self._nb_devMgr, devMgr = self.launchDeviceManager("/nodes/test_EventPortTestDevice_node/DeviceManager.dcd.xml")
domainName = scatest.getTestDomainName()
domMgr.installApplication("/waveforms/PortConnectFindByDomainFinderEvent/PortConnectFindByDomainFinderEvent.sad.xml")
appFact = domMgr._get_applicationFactories()[0]
app = appFact.create(appFact._get_name(), [], [])
app.start()
# Kill the domainMgr
os.kill(self._nb_domMgr.pid, signal.SIGINT)
if not self.waitTermination(self._nb_domMgr, 5.0):
self.fail("Domain Manager Failed to Die")
# Restart the Domain Manager (which should restore the old channel)
self._nb_domMgr, domMgr = self.launchDomainManager(endpoint="giop:tcp::5679", dbURI=self._dbfile)
newappFact = domMgr._get_applicationFactories()
self.assertEqual(len(newappFact), 0)
apps = domMgr._get_applications()
self.assertEqual(len(apps), 0)
devMgrs = domMgr._get_deviceManagers()
self.assertEqual(len(devMgrs), 0)
def test_nodeBooterDomainNameOverride(self):
"""Test that we can allow the user to override the domainname with the --domainname argument."""
domainName = scatest.getTestDomainName()
domainMgrURI = URI.stringToName("%s/%s" % (domainName, domainName))
# Test that we don't already have a bound domain
try:
domMgr = self._root.resolve(domainMgrURI)
self.assertEqual(domMgr, None)
except CosNaming.NamingContext.NotFound:
pass # This exception is expected
args = ["../../control/framework/nodeBooter","-D", "--domainname", domainName, "-debug", "9","--nopersist" ]
nb = Popen(args, cwd=scatest.getSdrPath() )
domMgr = self.waitDomainManager(domainMgrURI)
self.assertNotEqual(domMgr, None)
# Kill the nodebooter
os.kill(nb.pid, signal.SIGINT)
self.assertPredicateWithWait(lambda: nb.poll() == 0)
# Test that we cleaned up the name; this should be automatic because
# the naming context should be empty.
try:
domMgr = self._root.resolve(domainMgrURI)
self.assertEqual(domMgr, None)
except CosNaming.NamingContext.NotFound:
pass # This exception is expected
def terminateChildrenPidOnly(self, pid, signals=(signal.SIGINT, signal.SIGTERM)):
ls = commands.getoutput('ls /proc')
entries = ls.split('\n')
for entry in entries:
filename = '/proc/'+entry+'/status'
try:
fp = open(filename,'r')
stuff = fp.readlines()
fp.close()
except:
continue
ret = ''
for line in stuff:
if 'PPid' in line:
ret=line
break
if ret != '':
parentPid = ret.split('\t')[-1][:-1]
if parentPid == pid:
self.terminateChildrenPidOnly(entry, signals)
filename = '/proc/'+pid+'/status'
for sig in signals:
try:
os.kill(int(pid), sig)
except:
continue
done = False
attemptCount = 0
while not done:
try:
fp = open(filename,'r')
fp.close()
attemptCount += 1
if attemptCount == 10:
break
time.sleep(0.1)
except:
done = True
if not done:
continue
def terminateChildren(self, child, signals=(signal.SIGINT, signal.SIGTERM)):
ls = commands.getoutput('ls /proc')
entries = ls.split('\n')
for entry in entries:
filename = '/proc/'+entry+'/status'
try:
fp = open(filename,'r')
stuff = fp.readlines()
except:
continue
for line in stuff:
if 'PPid' in line:
ret=line
break
if ret != '':
parentPid = int(ret.split('\t')[-1][:-1])
if parentPid == child.pid:
self.terminateChildrenPidOnly(entry, signals)
if child.poll() != None:
return
try:
for sig in signals:
os.kill(child.pid, sig)
if self.waitTermination(child):
break
child.wait()
except OSError:
pass
def __terminate_process( process, signals=(_signal.SIGINT, _signal.SIGTERM, _signal.SIGKILL) ):
if process and process.poll() != None:
return
try:
for sig in signals:
_os.kill(process.pid, sig)
if __waitTermination(process):
break
process.wait()
except OSError, e:
pass
finally:
pass
def __init__(self):
helperBase.__init__(self)
self.usesPortIORString = None
self._providesPortDict = {}
self._processes = {}
self._STOP_SIGNALS = ((_signal.SIGINT, 1),
(_signal.SIGTERM, 5),
(_signal.SIGKILL, None))
def run(*steps):
"""
Helper to run one or more async functions synchronously, with graceful
handling of SIGINT / Ctrl-C.
Returns the return value of the last function.
"""
if not steps:
return
task = None
run._sigint = False # function attr to allow setting from closure
loop = asyncio.get_event_loop()
def abort():
task.cancel()
run._sigint = True
added = False
try:
loop.add_signal_handler(signal.SIGINT, abort)
added = True
except (ValueError, OSError, RuntimeError) as e:
# add_signal_handler doesn't work in a thread
if 'main thread' not in str(e):
raise
try:
for step in steps:
task = loop.create_task(step)
loop.run_until_complete(asyncio.wait([task], loop=loop))
if run._sigint:
raise KeyboardInterrupt()
if task.exception():
raise task.exception()
return task.result()
finally:
if added:
loop.remove_signal_handler(signal.SIGINT)
def _exit_gracefully(self, sig, frame):
self.close()
if sig == signal.SIGINT:
self.original_int_handler(sig, frame)
elif sig == signal.SITERM:
self.original_term_handler(sig, frame)
def register_signal_handlers(self):
self.original_int_handler = signal.getsignal(signal.SIGINT)
self.original_term_handler = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGINT, self._exit_gracefully)
signal.signal(signal.SIGTERM, self._exit_gracefully)
def _handle_epoch_interrupt(self, signal_number, frame):
# Try to complete the current epoch if user presses CTRL + C
logger.warning('Received epoch interrupt signal.' +
epoch_interrupt_message)
signal.signal(signal.SIGINT, self._handle_batch_interrupt)
self.log.current_row['epoch_interrupt_received'] = True
# Add a record to the status. Unlike the log record it will be
# easy to access at later iterations.
self.status['epoch_interrupt_received'] = True
def _restore_signal_handlers(self):
signal.signal(signal.SIGINT, self.original_sigint_handler)
signal.signal(signal.SIGTERM, self.original_sigterm_handler)