python类SIGINT的实例源码

ui.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def finish(self):
        """
        Restore the original SIGINT handler after finishing.

        This should happen regardless of whether the progress display finishes
        normally, or gets interrupted.
        """
        super(InterruptibleMixin, self).finish()
        signal(SIGINT, self.original_handler)
ui.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def handle_sigint(self, signum, frame):
        """
        Call self.finish() before delegating to the original SIGINT handler.

        This handler should only be in place while the progress display is
        active.
        """
        self.finish()
        self.original_handler(signum, frame)
kas.py 文件源码 项目:kas 作者: siemens 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def interruption():
    """
        Ignore SIGINT/SIGTERM in kas, let them be handled by our sub-processes
    """
    pass
helpers.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, *args, **kwargs):
        super(SigIntMixin, self).__init__(*args, **kwargs)
        signal(SIGINT, self._sigint_handler)
ui.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, *args, **kwargs):
        """
        Save the original SIGINT handler for later.
        """
        super(InterruptibleMixin, self).__init__(*args, **kwargs)

        self.original_handler = signal(SIGINT, self.handle_sigint)

        # If signal() returns None, the previous handler was not installed from
        # Python, and we cannot restore it. This probably should not happen,
        # but if it does, we must restore something sensible instead, at least.
        # The least bad option should be Python's default SIGINT handler, which
        # just raises KeyboardInterrupt.
        if self.original_handler is None:
            self.original_handler = default_int_handler
ui.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def finish(self):
        """
        Restore the original SIGINT handler after finishing.

        This should happen regardless of whether the progress display finishes
        normally, or gets interrupted.
        """
        super(InterruptibleMixin, self).finish()
        signal(SIGINT, self.original_handler)
ui.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def handle_sigint(self, signum, frame):
        """
        Call self.finish() before delegating to the original SIGINT handler.

        This handler should only be in place while the progress display is
        active.
        """
        self.finish()
        self.original_handler(signum, frame)
__init__.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def stop(self):
        os.kill(self.writer_proc.pid, signal.SIGINT)
test_01_DeviceManager.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_RogueService(self):
        devmgr_nb, devMgr = self.launchDeviceManager("/nodes/test_BasicTestDevice_node/DeviceManager.dcd.xml")
        import ossie.utils.popen as _popen
        from ossie.utils import redhawk
        rhdom= redhawk.attach(scatest.getTestDomainName())

        serviceName = "fake_1"
        args = []
        args.append("sdr/dev/services/fake/python/fake.py")
        args.append("DEVICE_MGR_IOR")
        args.append(self._orb.object_to_string(devMgr))
        args.append("SERVICE_NAME")
        args.append(serviceName)
        exec_file = "sdr/dev/services/fake/python/fake.py"
        external_process = _popen.Popen(args, executable=exec_file, cwd=os.getcwd(), preexec_fn=os.setpgrp)

        time.sleep(2)

        names=[serviceName]
        for svc in devMgr._get_registeredServices():
            self.assertNotEqual(svc, None)
            self.assertEqual(svc.serviceName in names, True)

        for svc in rhdom.services:
            self.assertNotEqual(svc, None)
            self.assertEqual(svc._instanceName in names, True)

        # Kill the external services
        os.kill(external_process.pid, signal.SIGINT)

        time.sleep(1)

        # check rogue service is removed
        self.assertEquals(len(devMgr._get_registeredServices()), 0)
        self.assertEquals(len(rhdom.services), 0)
test_09_DomainPersistence.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_EventAppPortConnectionSIGINT(self):
        self.localEvent = threading.Event()
        self.eventFlag = False

        self._nb_domMgr, domMgr = self.launchDomainManager(endpoint="giop:tcp::5679", dbURI=self._dbfile)
        self._nb_devMgr, devMgr = self.launchDeviceManager("/nodes/test_EventPortTestDevice_node/DeviceManager.dcd.xml")

        domainName = scatest.getTestDomainName()
        domMgr.installApplication("/waveforms/PortConnectFindByDomainFinderEvent/PortConnectFindByDomainFinderEvent.sad.xml")
        appFact = domMgr._get_applicationFactories()[0]
        app = appFact.create(appFact._get_name(), [], [])
        app.start()

        # Kill the domainMgr
        os.kill(self._nb_domMgr.pid, signal.SIGINT)
        if not self.waitTermination(self._nb_domMgr, 5.0):
            self.fail("Domain Manager Failed to Die")

        # Restart the Domain Manager (which should restore the old channel)
        self._nb_domMgr, domMgr = self.launchDomainManager(endpoint="giop:tcp::5679", dbURI=self._dbfile)

        newappFact = domMgr._get_applicationFactories()
        self.assertEqual(len(newappFact), 0)

        apps = domMgr._get_applications()
        self.assertEqual(len(apps), 0)

        devMgrs = domMgr._get_deviceManagers()
        self.assertEqual(len(devMgrs), 0)
test_01_nodeBooter.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_nodeBooterDomainNameOverride(self):
        """Test that we can allow the user to override the domainname with the --domainname argument."""
        domainName = scatest.getTestDomainName()
        domainMgrURI = URI.stringToName("%s/%s" % (domainName, domainName))
        # Test that we don't already have a bound domain
        try:
            domMgr = self._root.resolve(domainMgrURI)
            self.assertEqual(domMgr, None)
        except CosNaming.NamingContext.NotFound:
            pass # This exception is expected
        args = ["../../control/framework/nodeBooter","-D", "--domainname", domainName, "-debug", "9","--nopersist" ]
        nb = Popen(args, cwd=scatest.getSdrPath() )
        domMgr = self.waitDomainManager(domainMgrURI)
        self.assertNotEqual(domMgr, None)

        # Kill the nodebooter
        os.kill(nb.pid, signal.SIGINT)
        self.assertPredicateWithWait(lambda: nb.poll() == 0)

        # Test that we cleaned up the name; this should be automatic because
        # the naming context should be empty.
        try:
            domMgr = self._root.resolve(domainMgrURI)
            self.assertEqual(domMgr, None)
        except CosNaming.NamingContext.NotFound:
            pass # This exception is expected
scatest.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def terminateChildrenPidOnly(self, pid, signals=(signal.SIGINT, signal.SIGTERM)):
        ls = commands.getoutput('ls /proc')
        entries = ls.split('\n')
        for entry in entries:
            filename = '/proc/'+entry+'/status'
            try:
                fp = open(filename,'r')
                stuff = fp.readlines()
                fp.close()
            except:
                continue
            ret = ''
            for line in stuff:
                if 'PPid' in line:
                    ret=line
                    break
            if ret != '':
                parentPid = ret.split('\t')[-1][:-1]
                if parentPid == pid:
                    self.terminateChildrenPidOnly(entry, signals)
        filename = '/proc/'+pid+'/status'
        for sig in signals:
            try:
                os.kill(int(pid), sig)
            except:
                continue
            done = False
            attemptCount = 0
            while not done:
                try:
                    fp = open(filename,'r')
                    fp.close()
                    attemptCount += 1
                    if attemptCount == 10:
                        break
                    time.sleep(0.1)
                except:
                    done = True
            if not done:
                continue
scatest.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def terminateChildren(self, child, signals=(signal.SIGINT, signal.SIGTERM)):
        ls = commands.getoutput('ls /proc')
        entries = ls.split('\n')
        for entry in entries:
            filename = '/proc/'+entry+'/status'
            try:
                fp = open(filename,'r')
                stuff = fp.readlines()
            except:
                continue
            for line in stuff:
                if 'PPid' in line:
                    ret=line
                    break
            if ret != '':
                parentPid = int(ret.split('\t')[-1][:-1])
                if parentPid == child.pid:
                    self.terminateChildrenPidOnly(entry, signals)

        if child.poll() != None:
            return
        try:
            for sig in signals:
                os.kill(child.pid, sig)
                if self.waitTermination(child):
                    break
            child.wait()
        except OSError:
            pass
base.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __terminate_process( process, signals=(_signal.SIGINT, _signal.SIGTERM, _signal.SIGKILL) ):
        if process and process.poll() != None:
           return
        try:
            for sig in signals:
                _os.kill(process.pid, sig)
                if __waitTermination(process):
                    break
            process.wait()
        except OSError, e:
            pass
        finally:
            pass
io_helpers.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self):
        helperBase.__init__(self)
        self.usesPortIORString = None
        self._providesPortDict = {}
        self._processes = {}
        self._STOP_SIGNALS = ((_signal.SIGINT, 1),
                              (_signal.SIGTERM, 5),
                              (_signal.SIGKILL, None))
loop.py 文件源码 项目:python-libjuju 作者: juju 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def run(*steps):
    """
    Helper to run one or more async functions synchronously, with graceful
    handling of SIGINT / Ctrl-C.

    Returns the return value of the last function.
    """
    if not steps:
        return

    task = None
    run._sigint = False  # function attr to allow setting from closure
    loop = asyncio.get_event_loop()

    def abort():
        task.cancel()
        run._sigint = True

    added = False
    try:
        loop.add_signal_handler(signal.SIGINT, abort)
        added = True
    except (ValueError, OSError, RuntimeError) as e:
        # add_signal_handler doesn't work in a thread
        if 'main thread' not in str(e):
            raise
    try:
        for step in steps:
            task = loop.create_task(step)
            loop.run_until_complete(asyncio.wait([task], loop=loop))
            if run._sigint:
                raise KeyboardInterrupt()
            if task.exception():
                raise task.exception()
        return task.result()
    finally:
        if added:
            loop.remove_signal_handler(signal.SIGINT)
zookeeper.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _exit_gracefully(self, sig, frame):
        self.close()
        if sig == signal.SIGINT:
            self.original_int_handler(sig, frame)
        elif sig == signal.SITERM:
            self.original_term_handler(sig, frame)
zookeeper.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def register_signal_handlers(self):
        self.original_int_handler = signal.getsignal(signal.SIGINT)
        self.original_term_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGINT, self._exit_gracefully)
        signal.signal(signal.SIGTERM, self._exit_gracefully)
algorithm.py 文件源码 项目:dl4mt-multi 作者: nyu-dl 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def _handle_epoch_interrupt(self, signal_number, frame):
        # Try to complete the current epoch if user presses CTRL + C
        logger.warning('Received epoch interrupt signal.' +
                       epoch_interrupt_message)
        signal.signal(signal.SIGINT, self._handle_batch_interrupt)
        self.log.current_row['epoch_interrupt_received'] = True
        # Add a record to the status. Unlike the log record it will be
        # easy to access at later iterations.
        self.status['epoch_interrupt_received'] = True
algorithm.py 文件源码 项目:dl4mt-multi 作者: nyu-dl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _restore_signal_handlers(self):
        signal.signal(signal.SIGINT, self.original_sigint_handler)
        signal.signal(signal.SIGTERM, self.original_sigterm_handler)


问题


面经


文章

微信
公众号

扫码关注公众号