python类SIGTERM的实例源码

session.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def terminate(self):
        try:
            os.kill(self.pid, signal.SIGTERM)
        except OSError, e:
            if e.errno == errno.ESRCH:
                return # process already killed by someone else
            if e.errno == errno.EPERM:
                # this can only happen if the original session was started as
                # a different user. e.g. if the broker has been restarted with
                # --demote=<some other user>. but don't worry too much about it
                # as sessions are eventually terminated anyway.
                print(
                    'WARNING: session with PID=%d not terminated because it '
                    'is owned by a different user. did you restart a broker '
                    'as a different user?' % self.pid
                )
                return
            raise e
kas.py 文件源码 项目:kas 作者: siemens 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def kas(argv):
    """
        The main entry point of kas.
    """
    create_logger()

    parser = kas_get_argparser()
    args = parser.parse_args(argv)

    if args.debug:
        logging.getLogger().setLevel(logging.DEBUG)

    logging.info('%s %s started', os.path.basename(sys.argv[0]), __version__)

    loop = asyncio.get_event_loop()

    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, interruption)
    atexit.register(_atexit_handler)

    for plugin in getattr(kasplugin, 'plugins', []):
        if plugin().run(args):
            return

    parser.print_help()
server.py 文件源码 项目:py-abci 作者: davebryson 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def run(self):
        """Option to calling manually calling start()/stop(). This will start
        the server and watch for signals to stop the server"""
        self.server.start()
        log.info(" ABCIServer started on port: {}".format(self.port))
        # wait for interrupt
        evt = Event()
        gevent.signal(signal.SIGQUIT, evt.set)
        gevent.signal(signal.SIGTERM, evt.set)
        gevent.signal(signal.SIGINT, evt.set)
        evt.wait()
        log.info("Shutting down server")
        self.server.stop()

    # TM will spawn off 3 connections: mempool, consensus, query
    # If an error happens in 1 it still leaves the others open which
    # means you don't have all the connections available to TM
recipe-580672.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_err_in_fun(self):
            # Test that the original signal this process was hit with
            # is not returned in case fun raise an exception. Instead,
            # we're supposed to see retsig = 1.
            ret = pyrun(textwrap.dedent(
                """
                import os, signal, imp, sys
                mod = imp.load_source("mod", r"{}")

                def foo():
                    sys.stderr = os.devnull
                    1 / 0

                sig = signal.SIGTERM if os.name == 'posix' else \
                    signal.CTRL_C_EVENT
                mod.register_exit_fun(foo)
                os.kill(os.getpid(), sig)
                """.format(os.path.abspath(__file__), TESTFN)
            ))
            if POSIX:
                self.assertEqual(ret, 1)
                assert ret != signal.SIGTERM, strfsig(ret)
recipe-576667.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def subprocess_terminate( proc ) :
    try:
        proc.terminate()
    except AttributeError:
        print " no terminate method to Popen.."
        try:
            import signal
            os.kill( proc.pid , signal.SIGTERM)
        except AttributeError:
            print "  no os.kill, using win32api.."
            try:
                import win32api
                PROCESS_TERMINATE = 1
                handle = win32api.OpenProcess( PROCESS_TERMINATE, False, proc.pid)
                win32api.TerminateProcess(handle,-1)
                win32api.CloseHandle(handle)
            except ImportError:
                print "  ERROR: could not terminate process."
linux_pty.py 文件源码 项目:TerminalView 作者: Wramberg 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def stop(self):
        """
        Stop the shell
        """
        if self.is_running():
            try:
                os.kill(self._shell_pid, signal.SIGTERM)
            except OSError:
                pass

        start = time.time()
        while self.is_running() and (time.time() < (start + 0.2)):
            time.sleep(0.05)

        if self.is_running():
            utils.ConsoleLogger.log("Failed to stop shell process")
        else:
            utils.ConsoleLogger.log("Shell process stopped")
daemonize.py 文件源码 项目:pykit 作者: baishancloud 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def stop(self):

        pid = None

        if not os.path.exists(self.pidfile):

            logger.debug('pidfile not exist:' + self.pidfile)
            return

        try:
            pid = _read_file(self.pidfile)
            pid = int(pid)
            os.kill(pid, signal.SIGTERM)
            return

        except Exception as e:
            logger.warn('{e} while get and kill pid={pid}'.format(
                e=repr(e), pid=pid))
events.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _kill_ssh(self):
        if self.ssh_pid > 1:
            try:
                os.kill(self.ssh_pid, signal.SIGTERM)
                os.waitpid(self.ssh_pid, 0)
            except OSError, e:
                if e.errno not in [errno.ECHILD, errno.ESRCH]:
                    raise Exception('unhandled errno: %d' % e.errno)
            self.self_pid = -1
            try:
                os.close(self.ssh_fd)
            except OSError, e:
                if e.errno == errno.EBADF:
                    pass # already closed
                else:
                    print 'WHAT?', e
                    raise e
session.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def terminate(self):
        try:
            os.kill(self.pid, signal.SIGTERM)
        except OSError, e:
            if e.errno == errno.ESRCH:
                return # process already killed by someone else
            if e.errno == errno.EPERM:
                # this can only happen if the original session was started as
                # a different user. e.g. if the broker has been restarted with
                # --demote=<some other user>. but don't worry too much about it
                # as sessions are eventually terminated anyway.
                print(
                    'WARNING: session with PID=%d not terminated because it '
                    'is owned by a different user. did you restart a broker '
                    'as a different user?' % self.pid
                )
                return
            raise e
events.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _kill_ssh(self):
        if self.ssh_pid > 1:
            try:
                os.kill(self.ssh_pid, signal.SIGTERM)
                os.waitpid(self.ssh_pid, 0)
            except OSError, e:
                if e.errno not in [errno.ECHILD, errno.ESRCH]:
                    raise Exception('unhandled errno: %d' % e.errno)
            self.self_pid = -1
            try:
                os.close(self.ssh_fd)
            except OSError, e:
                if e.errno == errno.EBADF:
                    pass # already closed
                else:
                    print 'WHAT?', e
                    raise e
updater.py 文件源码 项目:mobot 作者: JokerQyou 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def idle(self, stop_signals=(SIGINT, SIGTERM, SIGABRT)):
        """
        Blocks until one of the signals are received and stops the updater

        Args:
            stop_signals: Iterable containing signals from the signal module
                that should be subscribed to. Updater.stop() will be called on
                receiving one of those signals. Defaults to (SIGINT, SIGTERM,
                SIGABRT)
        """
        for sig in stop_signals:
            signal(sig, self.signal_handler)

        self.is_idle = True

        while self.is_idle:
            sleep(1)
test_rolld.py 文件源码 项目:rolld 作者: Hipo 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def exit_test():
    global periodic_checker
    if periodic_checker:
        periodic_checker.stop()
    os.kill(rolld_proc.pid, signal.SIGTERM)
    os.kill(nginx_proc.pid, signal.SIGTERM)
    # IOLoop.instance().add_timeout(time.time() + 5, partial(sys.exit, 0))
    # check if we have zombies left
    try:
        lines = subprocess.check_output('ps auxw | grep python | grep app.py | grep -v grep', shell=True)
        print lines
        assert len(lines) == 0
    except subprocess.CalledProcessError as grepexc:
        # grep shouldnt find anything so exit code should be 1
        if grepexc.returncode == 1:
            pass
        else:
            raise
    # if everything is fine, just stop our ioloop now.
    IOLoop.current().stop()
tcpmon.py 文件源码 项目:tool 作者: PathDump 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def monitor_retransmit():
    global proc

    with proc.stdout:
        for line in iter (proc.stdout.readline, b''):
            if stop_flag:
                break

            tokens = line.split()
            if len(tokens) < 5 or (tokens[2] == '-:-' and tokens[4] == '-:-') \
                    or tokens[0] == "TIME":
                continue

            key = tokens[1] + ':' + tokens[3] + ':6'

            mon_lock.acquire()
            if key not in mon_flows:
                print "updating mon_flows", key
                mon_flows.update ({key: 1})
            mon_lock.release()

    os.kill (proc.pid, signal.SIGTERM)
    proc.wait() # wait for the subprocess to exit
    proc = None
Task.py 文件源码 项目:mongodb_consistent_backup 作者: Percona-Lab 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, task_name, manager, config, timer, base_dir, backup_dir, **kwargs):
        self.task_name  = task_name
        self.manager    = manager
        self.config     = config
        self.timer      = timer
        self.base_dir   = base_dir
        self.backup_dir = backup_dir
        self.args       = kwargs
        self.verbose    = self.config.verbose

        self.runnning  = False
        self.stopped   = False
        self.completed = False
        self.exit_code = 255

        self.thread_count          = None
        self.cpu_count             = cpu_count()
        self.compression_method    = 'none'
        self.compression_supported = ['none']
        self.timer_name            = self.__class__.__name__

        signal(SIGINT, SIG_IGN)
        signal(SIGTERM, self.close)
test_cotyledon.py 文件源码 项目:deb-python-cotyledon 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_sigkill(self):
        self.assert_everything_has_started()
        self.subp.kill()
        time.sleep(0.5)
        lines = sorted(self.get_lines())
        lines = self.hide_pids(lines)
        self.assertEqual([
            b'ERROR:cotyledon.tests.examples:heavy terminate',
            b'ERROR:cotyledon.tests.examples:heavy terminate',
            b'INFO:cotyledon:Caught SIGTERM signal, graceful exiting of '
            b'service heavy(0) [XXXX]',
            b'INFO:cotyledon:Caught SIGTERM signal, graceful exiting of '
            b'service heavy(1) [XXXX]',
            b'INFO:cotyledon:Caught SIGTERM signal, graceful exiting of '
            b'service light(0) [XXXX]',
            b'INFO:cotyledon:Parent process has died unexpectedly, '
            b'heavy(0) [XXXX] exiting',
            b'INFO:cotyledon:Parent process has died unexpectedly, '
            b'heavy(1) [XXXX] exiting',
            b'INFO:cotyledon:Parent process has died unexpectedly, '
            b'light(0) [XXXX] exiting',
        ], lines)
        self.assert_everything_is_dead(-9)
test_cotyledon.py 文件源码 项目:deb-python-cotyledon 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_graceful_timeout_term(self):
        lines = self.get_lines(1)
        childpid = self.get_pid(lines[0])
        self.subp.terminate()
        time.sleep(2)
        self.assertEqual(0, self.subp.poll())
        self.assertRaises(OSError, os.kill, self.subp.pid, 0)
        self.assertRaises(OSError, os.kill, childpid, 0)
        lines = self.hide_pids(self.get_lines())
        self.assertNotIn('ERROR:cotyledon.tests.examples:time.sleep done',
                         lines)
        self.assertEqual([
            b'INFO:cotyledon:Caught SIGTERM signal, graceful exiting of '
            b'service buggy(0) [XXXX]',
            b'INFO:cotyledon:Graceful shutdown timeout (1) exceeded, '
            b'exiting buggy(0) [XXXX] now.',
            b'DEBUG:cotyledon:Shutdown finish'
        ], lines[-3:])
test_cotyledon.py 文件源码 项目:deb-python-cotyledon 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_graceful_timeout_kill(self):
        lines = self.get_lines(1)
        childpid = self.get_pid(lines[0])
        self.subp.kill()
        time.sleep(2)
        self.assertEqual(-9, self.subp.poll())
        self.assertRaises(OSError, os.kill, self.subp.pid, 0)
        self.assertRaises(OSError, os.kill, childpid, 0)
        lines = self.hide_pids(self.get_lines())
        self.assertNotIn('ERROR:cotyledon.tests.examples:time.sleep done',
                         lines)
        self.assertEqual([
            b'INFO:cotyledon:Parent process has died unexpectedly, buggy(0) '
            b'[XXXX] exiting',
            b'INFO:cotyledon:Caught SIGTERM signal, graceful exiting of '
            b'service buggy(0) [XXXX]',
            b'INFO:cotyledon:Graceful shutdown timeout (1) exceeded, '
            b'exiting buggy(0) [XXXX] now.',
        ], lines[-3:])
test_subprocess.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_proc_exited(self):
        waiter = asyncio.Future(loop=self.loop)
        transport, protocol = self.create_transport(waiter)
        transport._process_exited(6)
        self.loop.run_until_complete(waiter)

        self.assertEqual(transport.get_returncode(), 6)

        self.assertTrue(protocol.connection_made.called)
        self.assertTrue(protocol.process_exited.called)
        self.assertTrue(protocol.connection_lost.called)
        self.assertEqual(protocol.connection_lost.call_args[0], (None,))

        self.assertFalse(transport._closed)
        self.assertIsNone(transport._loop)
        self.assertIsNone(transport._proc)
        self.assertIsNone(transport._protocol)

        # methods must raise ProcessLookupError if the process exited
        self.assertRaises(ProcessLookupError,
                          transport.send_signal, signal.SIGTERM)
        self.assertRaises(ProcessLookupError, transport.terminate)
        self.assertRaises(ProcessLookupError, transport.kill)

        transport.close()
service.py 文件源码 项目:weibo 作者: windskyer 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _child_process_handle_signal(self):
        # Setup child signal handlers differently

        def _sigterm(*args):
            self.signal_handler.clear()
            self.launcher.stop()

        def _sighup(*args):
            self.signal_handler.clear()
            raise SignalExit(signal.SIGHUP)

        self.signal_handler.clear()

        # Parent signals with SIGTERM when it wants us to go away.
        self.signal_handler.add_handler('SIGTERM', _sigterm)
        self.signal_handler.add_handler('SIGHUP', _sighup)
        self.signal_handler.add_handler('SIGINT', self._fast_exit)
service.py 文件源码 项目:weibo 作者: windskyer 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def stop(self):
        """Terminate child processes and wait on each."""
        self.running = False

        LOG.debug("Stop services.")
        for service in set(
                [wrap.service for wrap in self.children.values()]):
            service.stop()

        LOG.debug("Killing children.")
        for pid in self.children:
            try:
                os.kill(pid, signal.SIGTERM)
            except OSError as exc:
                if exc.errno != errno.ESRCH:
                    raise

        # Wait for children to die
        if self.children:
            LOG.info(_LI('Waiting on %d children to exit'), len(self.children))
            while self.children:
                self._wait_child()
__init__.py 文件源码 项目:OSPTF 作者: xSploited 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def send_signal(self, sig):
        """Send a signal to process pre-emptively checking whether
        PID has been reused (see signal module constants) .
        On Windows only SIGTERM is valid and is treated as an alias
        for kill().
        """
        if POSIX:
            self._send_signal(sig)
        else:  # pragma: no cover
            if sig == signal.SIGTERM:
                self._proc.kill()
            # py >= 2.7
            elif sig in (getattr(signal, "CTRL_C_EVENT", object()),
                         getattr(signal, "CTRL_BREAK_EVENT", object())):
                self._proc.send_signal(sig)
            else:
                raise ValueError(
                    "only SIGTERM, CTRL_C_EVENT and CTRL_BREAK_EVENT signals "
                    "are supported on Windows")
__init__.py 文件源码 项目:OSPTF 作者: xSploited 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def send_signal(self, sig):
        """Send a signal to process pre-emptively checking whether
        PID has been reused (see signal module constants) .
        On Windows only SIGTERM is valid and is treated as an alias
        for kill().
        """
        if POSIX:
            self._send_signal(sig)
        else:  # pragma: no cover
            if sig == signal.SIGTERM:
                self._proc.kill()
            # py >= 2.7
            elif sig in (getattr(signal, "CTRL_C_EVENT", object()),
                         getattr(signal, "CTRL_BREAK_EVENT", object())):
                self._proc.send_signal(sig)
            else:
                raise ValueError(
                    "only SIGTERM, CTRL_C_EVENT and CTRL_BREAK_EVENT signals "
                    "are supported on Windows")
popen_spawn.py 文件源码 项目:watchmen 作者: lycclsltt 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def kill(self, sig):
        '''Sends a Unix signal to the subprocess.

        Use constants from the :mod:`signal` module to specify which signal.
        '''
        if sys.platform == 'win32':
            if sig in [signal.SIGINT, signal.CTRL_C_EVENT]:
                sig = signal.CTRL_C_EVENT
            elif sig in [signal.SIGBREAK, signal.CTRL_BREAK_EVENT]:
                sig = signal.CTRL_BREAK_EVENT
            else:
                sig = signal.SIGTERM

        os.kill(self.proc.pid, sig)
adb_logcat_printer.py 文件源码 项目:nojs 作者: chrisdickinson 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def ShutdownLogcatMonitor(base_dir, logger):
  """Attempts to shutdown adb_logcat_monitor and blocks while waiting."""
  try:
    monitor_pid_path = os.path.join(base_dir, 'LOGCAT_MONITOR_PID')
    with open(monitor_pid_path) as f:
      monitor_pid = int(f.readline())

    logger.info('Sending SIGTERM to %d', monitor_pid)
    os.kill(monitor_pid, signal.SIGTERM)
    i = 0
    while True:
      time.sleep(.2)
      if not os.path.exists(monitor_pid_path):
        return
      if not os.path.exists('/proc/%d' % monitor_pid):
        logger.warning('Monitor (pid %d) terminated uncleanly?', monitor_pid)
        return
      logger.info('Waiting for logcat process to terminate.')
      i += 1
      if i >= 10:
        logger.warning('Monitor pid did not terminate. Continuing anyway.')
        return

  except (ValueError, IOError, OSError):
    logger.exception('Error signaling logcat monitor - continuing')
daemon.py 文件源码 项目:Starfish 作者: BillWang139967 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def stop(self):
        """
        Stop the daemon
        """
        # Get the pid from the pidfile
        try:
            pf = file(self.pidfile,'r')
            pid = int(pf.read().strip())
            pf.close()
        except IOError:
            pid = None

        if not pid:
            message = "pidfile %s does not exist. Daemon not running?\n"
            sys.stderr.write(message % self.pidfile)
            return # not an error in a restart

        # Try killing the daemon process       
        try:
            while 1:
                os.kill(pid, SIGTERM)
                time.sleep(0.1)
        except OSError, err:
            err = str(err)
            if err.find("No such process") > 0:
                if os.path.exists(self.pidfile):
                    os.remove(self.pidfile)
            else:
                print str(err)
                sys.exit(1)
daemon.py 文件源码 项目:Starfish 作者: BillWang139967 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def stop(self):
        """
        Stop the daemon
        """
        # Get the pid from the pidfile
        try:
            pf = file(self.pidfile,'r')
            pid = int(pf.read().strip())
            pf.close()
        except IOError:
            pid = None

        if not pid:
            message = "pidfile %s does not exist. Daemon not running?\n"
            sys.stderr.write(message % self.pidfile)
            return # not an error in a restart

        # Try killing the daemon process       
        try:
            while 1:
                os.kill(pid, SIGTERM)
                time.sleep(0.1)
        except OSError, err:
            err = str(err)
            if err.find("No such process") > 0:
                if os.path.exists(self.pidfile):
                    os.remove(self.pidfile)
            else:
                print str(err)
                sys.exit(1)
daemon.py 文件源码 项目:IotCenter 作者: panjanek 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def stop(self):
        try:
            pf = file(self.pidfile,'r')
            pid = int(pf.read().strip())
            pf.close()
        except IOError:
            pid = None

        if not pid:
            message = "pidfile %s does not exist. Daemon not running?\n"
            sys.stderr.write(message % self.pidfile)
            return # not an error in a restart

        print("killing process with pid {0}".format(pid))
        try:
            while 1:
                os.kill(pid, SIGTERM)
                time.sleep(0.1)
        except OSError, err:
            err = str(err)
            if err.find("No such process") > 0:
                if os.path.exists(self.pidfile):
                    os.remove(self.pidfile)
            else:
                print str(err)
                sys.exit(1)
kas.py 文件源码 项目:kas 作者: siemens 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def interruption():
    """
        Ignore SIGINT/SIGTERM in kas, let them be handled by our sub-processes
    """
    pass
test_09_DomainPersistence.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_EventAppPortConnectionSIGTERMNoPersist(self):
        self.localEvent = threading.Event()
        self.eventFlag = False

        self._nb_domMgr, domMgr = self.launchDomainManager("--nopersist", endpoint="giop:tcp::5679", dbURI=self._dbfile)
        self._nb_devMgr, devMgr = self.launchDeviceManager("/nodes/test_EventPortTestDevice_node/DeviceManager.dcd.xml")

        domainName = scatest.getTestDomainName()
        domMgr.installApplication("/waveforms/PortConnectFindByDomainFinderEvent/PortConnectFindByDomainFinderEvent.sad.xml")
        appFact = domMgr._get_applicationFactories()[0]
        app = appFact.create(appFact._get_name(), [], [])
        app.start()

        # Kill the domainMgr
        os.kill(self._nb_domMgr.pid, signal.SIGTERM)
        if not self.waitTermination(self._nb_domMgr, 5.0):
            self.fail("Domain Manager Failed to Die")

        # Restart the Domain Manager (which should restore the old channel)
        self._nb_domMgr, domMgr = self.launchDomainManager("--nopersist", endpoint="giop:tcp::5679", dbURI=self._dbfile)

        newappFact = domMgr._get_applicationFactories()
        self.assertEqual(len(newappFact), 0)

        apps = domMgr._get_applications()
        self.assertEqual(len(apps), 0)

        devMgrs = domMgr._get_deviceManagers()
        self.assertEqual(len(devMgrs), 0)
test_09_DomainPersistence.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_DeviceManagerDisappear(self):
        self._nb_domMgr, self._domMgr = self.launchDomainManager(endpoint="giop:tcp::5679", dbURI=self._dbfile)
        self._nb_devMgr, devMgr = self.launchDeviceManager("/nodes/test_BasicTestDevice_node/DeviceManager.dcd.xml")

        self.assertEqual(len(self._domMgr._get_applicationFactories()), 0)
        self.assertEqual(len(self._domMgr._get_applications()), 0)

        self._domMgr.installApplication("/waveforms/CommandWrapper/CommandWrapper.sad.xml")
        self.assertEqual(len(self._domMgr._get_applicationFactories()), 1)
        self.assertEqual(len(self._domMgr._get_applications()), 0)

        # Ensure the expected device is available
        self.assertNotEqual(devMgr, None)
        self.assertEqual(len(devMgr._get_registeredDevices()), 1)
        device = devMgr._get_registeredDevices()[0]

        # Kill the domainMgr and device manager
        os.kill(self._nb_domMgr.pid, signal.SIGKILL)
        if not self.waitTermination(self._nb_domMgr):
            self.fail("Domain Manager Failed to Die")

        os.kill(self._nb_devMgr.pid, signal.SIGTERM)
        if not self.waitTermination(self._nb_devMgr):
            self.fail("Device Manager Failed to Die")

        # Start the domainMgr again
        self._nb_domMgr, newDomMgr = self.launchDomainManager(endpoint="giop:tcp::5679", dbURI=self._dbfile)

        # Verify our client reference still is valid
        self.assertEqual(False, newDomMgr._non_existent())
        self.assertEqual(newDomMgr._get_identifier(),'DCE:5f52f645-110f-4142-8cc9-4d9316ddd958')
        self.assertEqual(self._domMgr._get_identifier(),'DCE:5f52f645-110f-4142-8cc9-4d9316ddd958')
        self.assertEqual(False, self._domMgr._non_existent())

        self.assertEqual(len(self._domMgr._get_deviceManagers()), 0)
        self.assertEqual(len(self._domMgr._get_applicationFactories()), 1)


问题


面经


文章

微信
公众号

扫码关注公众号