python类SIGKILL的实例源码

test_01_DeviceManager.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_CatatrophicUnregister(self):
        # Test that if a device manager dies unexpectedly and then re-registers there are no problems
        devmgr_nb, devMgr = self.launchDeviceManager("/nodes/test_SelfTerminatingDevice_node/DeviceManager.dcd.xml")
        self.assertNotEqual(devMgr, None)

        # NOTE These assert check must be kept in-line with the DeviceManager.dcd.xml
        self.assertEqual(len(devMgr._get_registeredDevices()), 1)

        devs = devMgr._get_registeredDevices()
        pids = getChildren(devmgr_nb.pid)
        for devpid in pids:
            os.kill(devpid, signal.SIGKILL)
        os.kill(devmgr_nb.pid, signal.SIGKILL)

        self.waitTermination(devmgr_nb)

        devmgr_nb, devMgr = self.launchDeviceManager("/nodes/test_SelfTerminatingDevice_node/DeviceManager.dcd.xml")
        self.assertNotEqual(devMgr, None)
        self.assertEqual(len(devMgr._get_registeredDevices()), 1)

        # Test that the DCD file componentproperties get pushed to configure()
        # as per DeviceManager requirement SR:482
        devMgr.shutdown()

        self.assert_(self.waitTermination(devmgr_nb), "Nodebooter did not die after shutdown")
test_09_DomainPersistence.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_DeviceManagerSurprise(self):
        self._nb_domMgr, self._domMgr = self.launchDomainManager(endpoint="giop:tcp::5679", dbURI=self._dbfile)
        self.assertEqual(len(self._domMgr._get_deviceManagers()), 0)

        # Kill the domainMgr
        os.kill(self._nb_domMgr.pid, signal.SIGKILL)
        if not self.waitTermination(self._nb_domMgr):
            self.fail("Domain Manager Failed to Die")

        self._nb_devMgr, devMgr = self.launchDeviceManager("/nodes/test_BasicTestDevice_node/DeviceManager.dcd.xml", wait=False)
        # this sleep is needed to allow the Device Manager to figure out that the Domain Manager is not available
        time.sleep(1)

        # Start the domainMgr again
        self._nb_domMgr, newDomMgr = self.launchDomainManager(endpoint="giop:tcp::5679", dbURI=self._dbfile)
        time.sleep(1) # sleep needed to make sure that the Device Manager has registered with the Domain Manager

        node_name = 'BasicTestDevice_node'
        domainName = scatest.getTestDomainName()
        devMgrURI = URI.stringToName("%s/%s/BasicTestDevice1" % (domainName, node_name))
        dev = self._root.resolve(devMgrURI)
        self.assertNotEqual(dev, None)

        self.assertEqual(len(self._domMgr._get_deviceManagers()), 1)
gdb.py 文件源码 项目:seb 作者: second-reality 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def kill(self):
        if not self.qemu:
            return

        try:
            self.qemu = None
            os.kill(self.current_qemu_pid, signal.SIGKILL)
            os.waitpid(self.current_qemu_pid, 0)
            self.current_qemu_pid = -1
        except OSError: # process may be finished by kill already
            pass

        LOG.debug('let gdb notice process was killed')
        try:
            execute_gdb_command('detach')
            raise RuntimeError('gdb should have disconnected and raise gdb.error')
        except gdb.error as e:
            LOG.debug('catch expected exception: ' + str(e))
manager.py 文件源码 项目:virtualbmc 作者: umago 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def stop(sel, domain_name):
        LOG.debug('Stopping Virtual BMC for domain %s', domain_name)
        domain_path = os.path.join(utils.CONFIG_PATH, domain_name)
        if not os.path.exists(domain_path):
            raise exception.DomainNotFound(domain=domain_name)

        pidfile_path = os.path.join(domain_path, 'pid')
        pid = None
        try:
            with open(pidfile_path, 'r') as f:
                pid = int(f.read())
        except IOError:
            raise exception.VirtualBMCError(
                'Error stopping the domain %s: PID file not '
                'found' % domain_name)
        else:
            os.remove(pidfile_path)

        try:
            os.kill(pid, signal.SIGKILL)
        except OSError:
            pass
android_handset.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def kill_cmd(self):
        if self.cmd_pid > 1:
            try:
                os.close(self.cmd_fd)
            except OSError, e:
                if e.errno == errno.EBADF:
                    pass # already closed
                else:
                    raise e
            try:
                os.kill(self.cmd_pid, signal.SIGKILL)
                os.waitpid(self.cmd_pid, 0)
            except OSError, e:
                if e.errno not in [errno.ECHILD, errno.ESRCH]:
                    raise Exception('unhandled errno: %d' % e.errno)
            self.cmd_pid = -1
android_handset.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def kill_cmd(self):
        if self.cmd_pid > 1:
            try:
                os.close(self.cmd_fd)
            except OSError, e:
                if e.errno == errno.EBADF:
                    pass # already closed
                else:
                    raise e
            try:
                os.kill(self.cmd_pid, signal.SIGKILL)
                os.waitpid(self.cmd_pid, 0)
            except OSError, e:
                if e.errno not in [errno.ECHILD, errno.ESRCH]:
                    raise Exception('unhandled errno: %d' % e.errno)
            self.cmd_pid = -1
_broker.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def shutdown(self, details=None):
        # terminate all processes started by the broker
        if self.hsl:
            self.hsl.terminate()
        if self.brl:
            self.brl.terminate()
        if self.wlan_lister:
            self.wlan_lister.terminate()
        if self.pm_lister:
            self.pm_lister.terminate()
        self.stop_sharing()
        for session in self.sessions.values():
            try:
                # send SIGTERM, not SIGKILL, so that Session.shutdown() runs
                session[LOCAL].terminate()
            except OSError, e:
                if e.errno not in [errno.ECHILD, errno.ESRCH]:
                    raise Exception('unhandled errno: %d' % e.errno)
        Control.shutdown(self, details) # does not return. always do last
cmd.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def t4():
    pretty = '%s t4' % __file__
    print(pretty)

    pid, fd = ave.cmd.run_bg('echo hello')
    poller  = select.poll()
    poller.register(fd, select.POLLIN)
    events  = poller.poll(1000) # milliseconds
    tmp     = ''
    for e in events:
        if not (e[1] & select.POLLIN):
            print('FAIL %s: unexpected poll event: %d' % (pretty, e[1]))
            os.kill(pid, signal.SIGKILL)
        tmp += os.read(fd, 1024)
    if not tmp.startswith('hello'):
        print('FAIL %s: wrong result: "%s"' % (pretty, tmp))

    os.kill(pid, signal.SIGKILL)
    os.waitpid(pid, 0)
    return True

# check that return value from executed program is correct
_broker.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def new_session(self, authkey):
        if authkey in self.sessions:
            raise Exception('INTERNAL ERROR: session already added for authkey')
        sock, port = find_free_port()
        session    = Session(
            port, authkey, self.address, sock, self.ws_cfg, self.home,
            self.config['logging']
        )
        session.start() # new process!
        self.join_later(session)
        remote     = RemoteSession((self.address[0], port), authkey)
        # connect to the new session and add the connection to event tracking
        try:
            self.add_connection(remote.connect(5), authkey)
        except Exception, e:
            print('ERROR: could not connect to new session: %s' % str(e))
            session.kill(signal.SIGKILL) # not much else to do
            return
        self.sessions[authkey] = (session, remote)

    # override callback defined by Control class
cmd.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def t4():
    pretty = '%s t4' % __file__
    print(pretty)

    pid, fd = ave.cmd.run_bg('echo hello')
    poller  = select.poll()
    poller.register(fd, select.POLLIN)
    events  = poller.poll(1000) # milliseconds
    tmp     = ''
    for e in events:
        if not (e[1] & select.POLLIN):
            print('FAIL %s: unexpected poll event: %d' % (pretty, e[1]))
            os.kill(pid, signal.SIGKILL)
        tmp += os.read(fd, 1024)
    if not tmp.startswith('hello'):
        print('FAIL %s: wrong result: "%s"' % (pretty, tmp))

    os.kill(pid, signal.SIGKILL)
    os.waitpid(pid, 0)
    return True

# check that return value from executed program is correct
process.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def terminate(self):
        if not self.running:
            return
        warning("Terminate %s" % self)
        done = False
        try:
            if self.is_stopped:
                self.cont(SIGKILL)
            else:
                self.kill(SIGKILL)
        except PtraceError, event:
            if event.errno == ESRCH:
                done = True
            else:
                raise event
        if not done:
            self.waitExit()
        self._notRunning()
commands.py 文件源码 项目:abusehelper 作者: Exploit-install 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def follow(self, lines=20):
        if not self.exists:
            yield "Instance does not exist."
            return

        process = popen("tail", "-n", str(lines), "-f", self.logpath)
        streams = set([process.stdout, process.stderr])

        try:
            while self.is_running and streams:
                readable, _, errors = select.select(streams, (), (), 0.5)
                for stream in readable:
                    line = stream.readline()
                    if not line:
                        streams.discard(stream)
                        continue

                    yield line.rstrip("\n").rstrip("\r")
        finally:
            send_signal(process.pid, signal.SIGKILL)
test_iutils.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def testOutputSignal(self):
        # Use SIGKILL here because it's guaranteed to be delivered. Using
        # SIGHUP might not work in, e.g., a buildbot slave run under the
        # 'nohup' command.
        exe = sys.executable
        scriptFile = self.makeSourceFile([
            "import sys, os, signal",
            "sys.stdout.write('stdout bytes\\n')",
            "sys.stderr.write('stderr bytes\\n')",
            "sys.stdout.flush()",
            "sys.stderr.flush()",
            "os.kill(os.getpid(), signal.SIGKILL)"
            ])

        def gotOutputAndValue(err):
            (out, err, sig) = err.value # XXX Sigh wtf
            self.assertEquals(out, "stdout bytes" + os.linesep)
            self.assertEquals(err, "stderr bytes" + os.linesep)
            self.assertEquals(sig, signal.SIGKILL)

        d = utils.getProcessOutputAndValue(exe, ['-u', scriptFile])
        return d.addErrback(gotOutputAndValue)
manager.py 文件源码 项目:virtualbmc 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def stop(self, domain_name):
        LOG.debug('Stopping Virtual BMC for domain %s', domain_name)
        domain_path = os.path.join(self.config_dir, domain_name)
        if not os.path.exists(domain_path):
            raise exception.DomainNotFound(domain=domain_name)

        pidfile_path = os.path.join(domain_path, 'pid')
        pid = None
        try:
            with open(pidfile_path, 'r') as f:
                pid = int(f.read())
        except (IOError, ValueError):
            raise exception.VirtualBMCError(
                'Error stopping the domain %s: PID file not '
                'found' % domain_name)
        else:
            os.remove(pidfile_path)

        try:
            os.kill(pid, signal.SIGKILL)
        except OSError:
            pass
ptyprocess.py 文件源码 项目:watchmen 作者: lycclsltt 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def close(self, force=True):
        '''This closes the connection with the child application. Note that
        calling close() more than once is valid. This emulates standard Python
        behavior with files. Set force to True if you want to make sure that
        the child is terminated (SIGKILL is sent if the child ignores SIGHUP
        and SIGINT). '''
        if not self.closed:
            self.flush()
            self.fileobj.close() # Closes the file descriptor
            # Give kernel time to update process status.
            time.sleep(self.delayafterclose)
            if self.isalive():
                if not self.terminate(force):
                    raise PtyProcessError('Could not terminate the child.')
            self.fd = -1
            self.closed = True
            #self.pid = None
hot-restarter.py 文件源码 项目:ambassador 作者: datawire 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def force_kill_all_children():
  """ Iterate through all known child processes and force kill them. In the future we might consider
      possibly giving the child processes time to exit but this is fine for now. If someone force kills
      us and does not clean the process tree this will leave child processes around unless they choose
      to end themselves if their parent process dies. """

  # First uninstall the SIGCHLD handler so that we don't get called again.
  signal.signal(signal.SIGCHLD, signal.SIG_DFL)

  global pid_list
  for pid in pid_list:
    print ("force killing PID={}".format(pid))
    try:
      os.kill(pid, signal.SIGKILL)
    except:
      print ("error force killing PID={} continuing".format(pid))

  pid_list = []
task.py 文件源码 项目:scriptworker 作者: mozilla-releng 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def kill(pid, sleep_time=1):
    """Kill ``pid`` with various signals.

    Args:
        pid (int): the process id to kill.
        sleep_time (int, optional): how long to sleep between killing the pid
            and checking if the pid is still running.

    """
    siglist = [signal.SIGINT, signal.SIGTERM]
    while True:
        sig = signal.SIGKILL
        if siglist:  # pragma: no branch
            sig = siglist.pop(0)
        try:
            os.kill(pid, sig)
            await asyncio.sleep(sleep_time)
            os.kill(pid, 0)
        except (OSError, ProcessLookupError):
            return


# max_timeout {{{1
arbiter.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def stop(self, graceful=True):
        """\
        Stop workers

        :attr graceful: boolean, If True (the default) workers will be
        killed gracefully  (ie. trying to wait for the current connection)
        """
        self.LISTENERS = []
        sig = signal.SIGTERM
        if not graceful:
            sig = signal.SIGQUIT
        limit = time.time() + self.cfg.graceful_timeout
        # instruct the workers to exit
        self.kill_workers(sig)
        # wait until the graceful timeout
        while self.WORKERS and time.time() < limit:
            time.sleep(0.1)

        self.kill_workers(signal.SIGKILL)
arbiter.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def murder_workers(self):
        """\
        Kill unused/idle workers
        """
        if not self.timeout:
            return
        workers = list(self.WORKERS.items())
        for (pid, worker) in workers:
            try:
                if time.time() - worker.tmp.last_update() <= self.timeout:
                    continue
            except (OSError, ValueError):
                continue

            if not worker.aborted:
                self.log.critical("WORKER TIMEOUT (pid:%s)", pid)
                worker.aborted = True
                self.kill_worker(pid, signal.SIGABRT)
            else:
                self.kill_worker(pid, signal.SIGKILL)
supervise.py 文件源码 项目:supervise 作者: catern 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __handle_event(self, msg, code):
        """Handle a single event"""
        # starting up
        if msg == b"pid":
            self.pid = code
        # main child process exiting normally
        elif msg == b"exited":
            self.returncode = code
        # main child process was signalled
        elif msg == b"killed":
            self.returncode = -code
        elif msg == b"dumped":
            self.returncode = -code
        # notification about no children
        elif msg == b"no_children":
            self.childfree = True
        # supervise exiting, in one of two ways
        elif msg == b"terminating":
            # normal termination
            self.childfree = True
            self.close()
        elif msg == b"":
            # hangup! This can only happen if supervise was SIGKILL'd (or worse)
            self.hangup = True
            self.close()
connection_timer.py 文件源码 项目:universe 作者: openai 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def measure_clock_skew(label, host):
    cmd = ['ntpdate', '-q', '-p', '8', host]
    extra_logger.info('[%s] Starting network calibration with %s', label, ' '.join(cmd))
    skew = Clockskew(label, cmd)
    # TODO: search PATH for this?
    process = reactor.spawnProcess(skew, '/usr/sbin/ntpdate', cmd, {})
    # process = reactor.spawnProcess(skew, '/bin/sleep', ['sleep', '2'], {})

    t = float(os.environ.get('UNIVERSE_NTPDATE_TIMEOUT', 20))
    def timeout():
        if process.pid:
            logger.error('[%s] %s call timed out after %ss; killing the subprocess. This is ok, but you could have more accurate timings by enabling UDP port 123 traffic to your env. (Alternatively, you can try increasing the timeout by setting environment variable UNIVERSE_NTPDATE_TIMEOUT=10.)', label, ' '.join(cmd), t)
            process.signalProcess(signal.SIGKILL)
            process.reapProcess()
    # TODO: make this part of the connection string
    reactor.callLater(t, timeout)
    return skew.deferred
heartbreaker.py 文件源码 项目:heartbreaker 作者: lokori 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def killProc(self):
        global proc
        try:
            proc.terminate()
            #os.killpg(proc.pid, signal.SIGKILL)
            #print "after terminate: ", proc.pid
        except:
            pass  
        try:
            proc.poll()
        except:
            pass
            #os.killpg(proc.pid, signal.SIGTERM)
        try:    
            del proc
        except:
            pass
        if os.path.isfile('.target_fail'):
            os.remove('.target_fail')
        if os.path.isfile('.target_resp'):
            os.remove('.target_resp')
ptyprocess.py 文件源码 项目:leetcode 作者: thomasyimgit 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def close(self, force=True):
        '''This closes the connection with the child application. Note that
        calling close() more than once is valid. This emulates standard Python
        behavior with files. Set force to True if you want to make sure that
        the child is terminated (SIGKILL is sent if the child ignores SIGHUP
        and SIGINT). '''
        if not self.closed:
            self.flush()
            self.fileobj.close() # Closes the file descriptor
            # Give kernel time to update process status.
            time.sleep(self.delayafterclose)
            if self.isalive():
                if not self.terminate(force):
                    raise PtyProcessError('Could not terminate the child.')
            self.fd = -1
            self.closed = True
            #self.pid = None
test_processkiller.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_accept_small_start_time_skews(self, kill_mock):
        """
        The boot time isn't very precise, so accept small skews in the
        computed process start time.
        """
        self.manager.add(self.signaller)
        self.builder.create_data(100, self.builder.RUNNING,
                                 uid=1000, gid=1000, started_after_boot=10,
                                 process_name="ooga")

        self.manager.dispatch_message(
            {"type": "signal-process",
             "operation-id": 1,
             "pid": 100, "name": "ooga",
             "start-time": 21, "signal": "KILL"})
        kill_mock.assert_called_once_with(100, signal.SIGKILL)
module_common.py 文件源码 项目:SiMon 作者: maxwelltsai 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def sim_kill(self):
        """
        Forcibly kill (i.e. terminate) the current simulation. Practically, this method terminates the process of
        the simulation code and sets the simulation status to STOP.

        :return: Return 0 if succeed, -1 if failed. If the simulation is not running, then it cannot be killed, causing
        the method to do nothing but return 1.
        """
        # Find the process by PID
        if self.config.has_option('Simulation', 'PID'):
            pid = self.config.getint('Simulation', 'PID')
            try:
                os.kill(pid, signal.SIGKILL)
                msg = 'Simulation %s (PID: %d) killed.' % (self.name, pid)
                print(msg)
                if self.logger is not None:
                    self.logger.info(msg)
            except OSError, err:
                msg = '%s: Cannot kill the process: %s\n' % (str(err),  self.name)
                print(msg)
                if self.logger is not None:
                    self.logger.warning(msg)
        return 0
jobs.py 文件源码 项目:incubator-airflow-old 作者: apache 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def terminate(self, sigkill=False):
        """
        Terminate (and then kill) the process launched to process the file.
        :param sigkill: whether to issue a SIGKILL if SIGTERM doesn't work.
        :type sigkill: bool
        """
        if self._process is None:
            raise AirflowException("Tried to call stop before starting!")
        # The queue will likely get corrupted, so remove the reference
        self._result_queue = None
        self._process.terminate()
        # Arbitrarily wait 5s for the process to die
        self._process.join(5)
        if sigkill and self._process.is_alive():
            self.log.warning("Killing PID %s", self._process.pid)
            os.kill(self._process.pid, signal.SIGKILL)
test_helpers.py 文件源码 项目:incubator-airflow-old 作者: apache 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_kill_process_tree(self):
        """Spin up a process that can't be killed by SIGTERM and make sure it gets killed anyway."""
        child_process_killed = multiprocessing.Value('i', 0)
        process_done = multiprocessing.Semaphore(0)
        child_pid = multiprocessing.Value('i', 0)
        setup_done = multiprocessing.Semaphore(0)
        args = [child_process_killed, child_pid, process_done, setup_done]
        child = multiprocessing.Process(target=TestHelpers._parent_of_ignores_sigterm, args=args)
        try:
            child.start()
            self.assertTrue(process_done.acquire(timeout=5.0))
            self.assertEqual(1, child_process_killed.value)
        finally:
            try:
                os.kill(child_pid.value, signal.SIGKILL) # terminate doesnt work here
            except OSError:
                pass
arbiter.py 文件源码 项目:chihu 作者: yelongyu 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def stop(self, graceful=True):
        """\
        Stop workers

        :attr graceful: boolean, If True (the default) workers will be
        killed gracefully  (ie. trying to wait for the current connection)
        """
        for l in self.LISTENERS:
            l.close()
        self.LISTENERS = []
        sig = signal.SIGTERM
        if not graceful:
            sig = signal.SIGQUIT
        limit = time.time() + self.cfg.graceful_timeout
        # instruct the workers to exit
        self.kill_workers(sig)
        # wait until the graceful timeout
        while self.WORKERS and time.time() < limit:
            time.sleep(0.1)

        self.kill_workers(signal.SIGKILL)
arbiter.py 文件源码 项目:chihu 作者: yelongyu 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def murder_workers(self):
        """\
        Kill unused/idle workers
        """
        if not self.timeout:
            return
        workers = list(self.WORKERS.items())
        for (pid, worker) in workers:
            try:
                if time.time() - worker.tmp.last_update() <= self.timeout:
                    continue
            except (OSError, ValueError):
                continue

            if not worker.aborted:
                self.log.critical("WORKER TIMEOUT (pid:%s)", pid)
                worker.aborted = True
                self.kill_worker(pid, signal.SIGABRT)
            else:
                self.kill_worker(pid, signal.SIGKILL)
validator_manager.py 文件源码 项目:sawtooth-validator 作者: hyperledger-archives 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def shutdown(self, force=False):
        if self._handle:
            self._handle.poll()
            if not self._handle.returncode:
                if force:
                    try:
                        if os.name == "nt":
                            self._handle.kill()
                        else:
                            self._handle.send_signal(signal.SIGKILL)
                    except OSError:
                        pass  # ignore invalid process and other os type errors
                else:
                    try:
                        if os.name == "nt":
                            self._handle.terminate()
                        else:
                            self._handle.send_signal(signal.SIGINT)
                    except OSError:
                        pass
        if self._output and not self._output.closed:
            self._output.close()
        if self._outerr and not self._outerr.closed:
            self._outerr.close()


问题


面经


文章

微信
公众号

扫码关注公众号