python类getpgid()的实例源码

mx.py 文件源码 项目:mx 作者: graalvm 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _kill_process(pid, sig):
    """
    Sends the signal `sig` to the process identified by `pid`. If `pid` is a process group
    leader, then signal is sent to the process group id.
    """
    pgid = os.getpgid(pid)
    try:
        logvv('[{} sending {} to {}]'.format(os.getpid(), sig, pid))
        if pgid == pid:
            os.killpg(pgid, sig)
        else:
            os.kill(pid, sig)
        return True
    except:
        log('Error killing subprocess ' + str(pid) + ': ' + str(sys.exc_info()[1]))
        return False
__init__.py 文件源码 项目:phuzz 作者: XiphosResearch 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def stop(self):
        if self.proc:
            try:
                self.proc.terminate()
            except OSError:
                if self.sudo_kill:
                    pgrp = os.getpgid(self.proc.pid)
                    subprocess.check_call([
                        'sudo', 'pkill', '-TERM', '-g', str(pgrp)
                    ])
            wait_for_proc_death(self.proc)
            LOG.debug('SyscallTracer stopped, pid: %r', self.proc.pid)
            self.proc = None
        if self.logfh:
            self.logfh.close()
            self.logfh = None
__init__.py 文件源码 项目:aetros-cli 作者: aetros 项目源码 文件源码 阅读 138 收藏 0 点赞 0 评论 0
def raise_sigint():
    """
    Raising the SIGINT signal in the current process and all sub-processes.

    os.kill() only issues a signal in the current process (without subprocesses).
    CTRL+C on the console sends the signal to the process group (which we need).
    """
    if hasattr(signal, 'CTRL_C_EVENT'):
        # windows. Need CTRL_C_EVENT to raise the signal in the whole process group
        os.kill(os.getpid(), signal.CTRL_C_EVENT)
    else:
        # unix.
        pgid = os.getpgid(os.getpid())
        if pgid == 1:
            os.kill(os.getpid(), signal.SIGINT)
        else:
            os.killpg(os.getpgid(os.getpid()), signal.SIGINT)
start_controller.py 文件源码 项目:parsl 作者: Parsl 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def close(self):
        ''' Terminate the controller process and it's child processes.

        Args:
              - None
        '''
        if self.reuse :
            logger.debug("Ipcontroller not shutting down: reuse enabled")
            return

        try:
            pgid = os.getpgid(self.proc.pid)
            status = os.killpg(pgid, signal.SIGTERM)
            time.sleep(0.2)
            os.killpg(pgid, signal.SIGKILL)
            try:
                self.proc.wait(timeout=1)
                x = self.proc.returncode
                logger.debug("Controller exited with {0}".format(x))
            except subprocess.TimeoutExpired :
                logger.warn("Ipcontroller process:{0} cleanup failed. May require manual cleanup".format(self.proc.pid))

        except Exception as e:
            logger.warn("Failed to kill the ipcontroller process[{0}]: {1}".format(self.proc.pid,
                                                                                   e))
pipeline.py 文件源码 项目:Comparative-Annotation-Toolkit 作者: ComparativeGenomicsToolkit 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _setPgid(pid, pgid):
    """set pgid of a process, ignored exception caused by race condition
    that occurs if already set by parent or child has already existed"""
    # Should just ignore on EACCES, as to handle race condition with parent
    # and child.  However some Linux kernels (seen in 2.6.18-53) report ESRCH
    # or EPERM.  To handle this is a straight-forward way, just check that the
    # change has been made.  However, in some cases the change didn't take,
    # retrying seems to make the problem go away.
    for i in xrange(0,5):
        try:
            os.setpgid(pid, pgid)
            return
        except OSError:
            if os.getpgid(pid) == pgid:
                return
            time.sleep(0.25) # sleep for retry
    # last try, let it return an error
    os.setpgid(pid, pgid)

# FIXME: why not use pipes.quote?
test_subprocess.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def test_start_new_session(self):
        # For code coverage of calling setsid().  We don't care if we get an
        # EPERM error from it depending on the test execution environment, that
        # still indicates that it was called.
        try:
            output = subprocess.check_output(
                    [sys.executable, "-c",
                     "import os; print(os.getpgid(os.getpid()))"],
                    start_new_session=True)
        except OSError as e:
            if e.errno != errno.EPERM:
                raise
        else:
            parent_pgid = os.getpgid(os.getpid())
            child_pgid = int(output)
            self.assertNotEqual(parent_pgid, child_pgid)
test_subprocess.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_start_new_session(self):
        # For code coverage of calling setsid().  We don't care if we get an
        # EPERM error from it depending on the test execution environment, that
        # still indicates that it was called.
        try:
            output = subprocess.check_output(
                    [sys.executable, "-c",
                     "import os; print(os.getpgid(os.getpid()))"],
                    start_new_session=True)
        except OSError as e:
            if e.errno != errno.EPERM:
                raise
        else:
            parent_pgid = os.getpgid(os.getpid())
            child_pgid = int(output)
            self.assertNotEqual(parent_pgid, child_pgid)
timeshare.py 文件源码 项目:TikZ 作者: ellisk42 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def execute(self,dt):
        if self.finished: return "finished"
        if not self.running:
            self.process = Process(target = executeInProcessGroup, args = (self,))
            self.process.start()
            print "timeshare child PID:",self.process.pid
            os.setpgid(self.process.pid,self.process.pid)
            print "timeshare process group",os.getpgid(self.process.pid)
            assert os.getpgid(self.process.pid) == self.process.pid
            print "my process group",os.getpgrp(),"which should be",os.getpgid(0)
            assert os.getpgid(self.process.pid) != os.getpgid(0)
            self.running = True
        else:
            os.killpg(self.process.pid, signal.SIGCONT)

        self.process.join(dt)
        if self.process.is_alive():
            os.killpg(self.process.pid, signal.SIGSTOP)
            return "still running"
        else:
            self.finished = True
            return self.q.get()
test_subprocess.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def test_start_new_session(self):
        # For code coverage of calling setsid().  We don't care if we get an
        # EPERM error from it depending on the test execution environment, that
        # still indicates that it was called.
        try:
            output = subprocess.check_output(
                    [sys.executable, "-c",
                     "import os; print(os.getpgid(os.getpid()))"],
                    start_new_session=True)
        except OSError as e:
            if e.errno != errno.EPERM:
                raise
        else:
            parent_pgid = os.getpgid(os.getpid())
            child_pgid = int(output)
            self.assertNotEqual(parent_pgid, child_pgid)
bootstrap.py 文件源码 项目:test-infra 作者: istio 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def terminate(end, proc, kill):
    """Terminate or kill the process after end."""
    if not end or time.time() <= end:
        return False
    if kill:  # Process will not die, kill everything
        pgid = os.getpgid(proc.pid)
        logging.info(
            'Kill %d and process group %d', proc.pid, pgid)
        os.killpg(pgid, signal.SIGKILL)
        proc.kill()
        return True
    logging.info(
        'Terminate %d on timeout', proc.pid)
    proc.terminate()
    return True
telegram.py 文件源码 项目:pybot 作者: raelga 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def kill_process(pid):

    pgrp = os.getpgid(pid)
    os.killpg(pgrp, signal.SIGINT)
    out = check_output(["ps", "auxf"])
    print(out.decode('utf-8'))
sharedmemory.py 文件源码 项目:temboard-agent 作者: dalibo 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def purge_expired(self, ttl, logger):
        """
        In charge of removing old Commands (when Command's last update time +
        given TTL is prior to current timestamp and the worker finished its
        work on it) from the shared array.
        """
        for i in range(0, self.size):
            if len(self.commands[i].commandid) == T_COMMANDID_SIZE and \
                (self.commands[i].time + ttl) < time.time() and \
                (self.commands[i].state == COMMAND_DONE or
                    self.commands[i].state == COMMAND_ERROR):
                logger.debug("Removing command with commandid=%s" %
                             (self.commands[i].commandid))
                # Deletion: overwrite array element by a null Command.
                self.commands[i] = Command()
            # We need to ckeck if the processes executing commands with a
            # state = COMMAND_START are stil alive and then remove the
            # command if they are not.
            if len(self.commands[i].commandid) == T_COMMANDID_SIZE and \
               self.commands[i].state == COMMAND_START and \
               self.commands[i].pid > 0:
                try:
                    os.getpgid(self.commands[i].pid)
                except OSError:
                    logger.debug("Removing command with commandid=%s." % (
                                 self.commands[i].commandid))
                    self.commands[i] = Command()
test_subprocess.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_start_new_session(self):
        # For code coverage of calling setsid().  We don't care if we get an
        # EPERM error from it depending on the test execution environment, that
        # still indicates that it was called.
        try:
            output = subprocess.check_output(
                    [sys.executable, "-c",
                     "import os; print(os.getpgid(os.getpid()))"],
                    start_new_session=True)
        except OSError as e:
            if e.errno != errno.EPERM:
                raise
        else:
            parent_pgid = os.getpgid(os.getpid())
            child_pgid = int(output)
            self.assertNotEqual(parent_pgid, child_pgid)
usb_checkout.py 文件源码 项目:DeviceNanny 作者: hudl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def stop_program_if_running():
    """
    Gets the group process ID from the process ID of shell script triggered by UDEV rule.
    Then calls the kill function.
    """
    pid = get_pid("[s]tart_checkout")
    print(pid)
    print(type(pid))
    pgid = os.getpgid(int(pid[0]))
    logging.debug(
        "[usb_checkout][stop_program_if_running] PGID: {}".format(pgid))
    delete_tempfile(filename)
    kill(pgid)
selflog.py 文件源码 项目:Pigrow 作者: Pragmatismo 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def check_script_running(script):
    try:
        script_test = map(int,check_output(["pidof",script,"-x"]).split())
    except:
        script_test = False
    if script_test == False:
        #print(script + " not running!")
        return {'num_running':'0','script_status':'none','script_path':'none'}
    else:
        if len(script_test) > 1:
            #print("There's more than one " + script + " running!")
            for pid in script_test:
                #print "---"
                #print pid
                try:
                    script_test_path = open(os.path.join('/proc', str(pid), 'cmdline'), 'rb').read()
                    #print script_test_path
                except IOError:
                    #print("I think it died when we looked at it...")
                    return {'num_running':'0','script_status':'died','script_path':'none'}
                #print os.getpgid(pid) # Return the process group id
                for line in open("/proc/"+ str(pid)  +"/status").readlines():
                    if line.split(':')[0] == "State":
                        script_test_status = line.split(':')[1].strip()
                return {'num_running':str(len(script_test)),'script_status':script_test_status,'script_path':script_test_path}
                #os.kill(pid, sig)
        else:
            #print(script + " is running!")
            for line in open("/proc/"+ str(script_test[0])  +"/status").readlines():
                if line.split(':')[0] == "State":
                    script_test_status = line.split(':')[1].strip()
            try:
                script_test_path = open(os.path.join('/proc', str(script_test[0]), 'cmdline'), 'rb').read()
            except IOError:
                #print("I think it died when we looked at it...")
                return {'num_running':'0','script_status':'died','script_path':'none'}
            #print script_test_path
            #print script_test_status
            return {'num_running':'1','script_status':script_test_status,'script_path':script_test_path}
wrapping.py 文件源码 项目:FLASH 作者: yuyuz 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_all_p_for_pgid():
    current_pgid = os.getpgid(os.getpid())
    running_pid = []
    for pid in psutil.process_iter():
        try:
            pgid = os.getpgid(pid)
        except:
            continue

        # Don't try to kill HPOlib-run
        if pgid == current_pgid and pid != os.getpid():
            # This solves the problem that a Zombie process counts
            # towards the number of process which have to be killed
            running_pid.append(pid)
    return running_pid
sh.py 文件源码 项目:unravel 作者: Unrepl 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_pgid(self):
        """ return the CURRENT group id of the process. this differs from
        self.pgid in that this refects the current state of the process, where
        self.pgid is the group id at launch """
        return os.getpgid(self.pid)
jobs.py 文件源码 项目:gitsome 作者: donnemartin 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _set_pgrp(info):
        try:
            info['pgrp'] = os.getpgid(info['obj'].pid)
        except ProcessLookupError:
            pass
__init__.py 文件源码 项目:pifpaf 作者: jd 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _kill(self, process):
        try:
            process.terminate()
        except ProcessLookupError:
            # Python 3
            return
        except OSError as e:
            if e.errno != errno.ESRCH:
                raise
            return
        try:

            self._wait(process)
        except tenacity.RetryError:
            LOG.warning("PID %d didn't terminate cleanly after 10 seconds, "
                        "sending SIGKILL to its process group", process.pid)
            # Cleanup remaining processes
            try:
                pgrp = os.getpgid(process.pid)
            except OSError as e:
                # ESRCH is returned if process just died in the meantime
                if e.errno != errno.ESRCH:
                    raise
            else:
                os.killpg(pgrp, signal.SIGKILL)
        process.wait()
executor.py 文件源码 项目:ARMPython 作者: agustingianni 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def sys_getpgid(self):
        """
        setpgid, getpgid, setpgrp, getpgrp - set/get process group
        """
        return os.getpgid()
get_ctl_from_exp.py 文件源码 项目:ENCODE_downloader 作者: kundajelab 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def run_shell_cmd(cmd): 
    try:
        p = subprocess.Popen(cmd, shell=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            universal_newlines=True,
            preexec_fn=os.setsid)
        pid = p.pid
        pgid = os.getpgid(pid)
        print('run_shell_cmd: PID={}, CMD={}'.format(pid, cmd))
        ret = ''
        while True:
            line = p.stdout.readline()
            if line=='' and p.poll() is not None:
                break
            # log.debug('PID={}: {}'.format(pid,line.strip('\n')))
            print('PID={}: {}'.format(pid,line.strip('\n')))
            ret += line
        p.communicate() # wait here
        if p.returncode > 0:
            raise subprocess.CalledProcessError(
                p.returncode, cmd)
        return ret.strip('\n')
    except:
        # kill all child processes
        log.exception('Unknown exception caught. '+ \
            'Killing process group {}...'.format(pgid))
        os.killpg(pgid, signal.SIGKILL)
        p.terminate()
        raise Exception('Unknown exception caught. PID={}'.format(pid))
processproxy.py 文件源码 项目:enterprise_gateway 作者: jupyter-incubator 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def launch_process(self, kernel_cmd, **kw):
        super(LocalProcessProxy, self).launch_process(kernel_cmd, **kw)

        # launch the local run.sh
        self.local_proc = launch_kernel(kernel_cmd, **kw)
        self.pid = self.local_proc.pid
        if hasattr(os, "getpgid"):
            try:
                self.pgid = os.getpgid(self.pid)
            except OSError:
                pass
        self.ip = local_ip
        self.log.info("Local kernel launched on '{}', pid: {}, pgid: {}, KernelID: {}, cmd: '{}'"
                      .format(self.ip, self.pid, self.pgid, self.kernel_id, kernel_cmd))
        return self
launch_ipykernel.py 文件源码 项目:enterprise_gateway 作者: jupyter-incubator 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def return_connection_info(connection_file, ip, response_addr, disable_gateway_socket):
    gateway_sock = None
    response_parts = response_addr.split(":")
    if len(response_parts) != 2:
        print("Invalid format for response address '{}'.  Assuming 'pull' mode...".format(response_addr))
        return

    response_ip = response_parts[0]
    try:
        response_port = int(response_parts[1])
    except ValueError:
        print("Invalid port component found in response address '{}'.  Assuming 'pull' mode...".format(response_addr))
        return

    with open(connection_file) as fp:
        cf_json = json.load(fp)
        fp.close()

    # add process and process group ids into connection info.
    pid = os.getpid()
    cf_json['pid'] = str(pid)
    cf_json['pgid'] = str(os.getpgid(pid))

    # prepare socket address for handling signals
    if not disable_gateway_socket:
        gateway_sock = prepare_gateway_socket()
        cf_json['comm_port'] = gateway_sock.getsockname()[1]

    s = socket(AF_INET, SOCK_STREAM)
    try:
        s.connect((response_ip, response_port))
        s.send(json.dumps(cf_json).encode(encoding='utf-8'))
    finally:
        s.close()

    return gateway_sock
bag.py 文件源码 项目:rostrace 作者: ChrisTimperley 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def record():
    print("Recording to rosbag...")
    print("hmmm")
    cmd = "rosbag record -a"
    p = None
    try:
        p = subprocess.Popen(cmd, preexec_fn=os.setsid, shell=True)
        p.communicate()
    finally:
        #p.send_signal(signal.SIGINT)
        #os.kill(p.pid, signal.SIGINT)
        print("Killing PG: {}".format(os.getpgid(p.pid)))
        os.kill(p.pid, signal.SIGKILL)
        os.killpg(os.getpgid(p.pid), signal.SIGKILL)
lib.py 文件源码 项目:FogLAMP 作者: foglamp 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _check_semaphore_file(cls, file_name):
        """ Evaluates if a specific either backup or restore operation is in execution

        Args:
            file_name: semaphore file, full path
        Returns:
            pid: 0= no operation is in execution or the pid retrieved from the semaphore file
        Raises:
        """

        _logger.debug("{func}".format(func="check_semaphore_file"))

        pid = 0

        if os.path.exists(file_name):
            pid = cls._pid_file_retrieve(file_name)

            # Check if the process is really running
            try:
                os.getpgid(pid)
            except ProcessLookupError:
                # Process is not running, removing the semaphore file
                os.remove(file_name)

                _message = _MESSAGES_LIST["e000002"].format(file_name, pid)
                _logger.warning("{0}".format(_message))

                pid = 0

        return pid
posix.py 文件源码 项目:slug 作者: xonsh 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def pgid(self):
        """
        Process group ID, or None if it hasn't started yet.

        POSIX only.
        """
        if self.pid is not None:
            return os.getpgid(self.pid)
utils.py 文件源码 项目:fabric8-analytics-worker 作者: fabric8-analytics 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run(self, timeout=None, is_json=False, **kwargs):
        """Run the self.command and wait up to given time period for results.

        :param timeout: how long to wait, in seconds, for the command to finish
        before terminating it
        :param is_json: hint whether output of the command is a JSON
        :return: triplet (return code, stdout, stderr), stdout will be a
        dictionary if `is_json` is True
        """
        logger.debug("running command '%s'; timeout '%s'", self.command, timeout)

        # this gets executed in a separate thread
        def target(**kwargs):
            try:
                self.process = Popen(self.command, universal_newlines=True, **kwargs)
                self.output, self.error = self.process.communicate()
                self.status = self.process.returncode
            except Exception:
                self.output = {} if is_json else []
                self.error = format_exc()
                self.status = -1

        # default stdout and stderr
        if 'stdout' not in kwargs:
            kwargs['stdout'] = PIPE
        if 'stderr' not in kwargs:
            kwargs['stderr'] = PIPE
        if 'update_env' in kwargs:
            # make sure we update environment, not override it
            kwargs['env'] = dict(os_environ, **kwargs['update_env'])
            kwargs.pop('update_env')

        # thread
        thread = Thread(target=target, kwargs=kwargs)
        thread.start()
        thread.join(timeout)

        # timeout reached, terminate the thread
        if thread.is_alive():
            logger.error('Command {cmd} timed out after {t} seconds'.format(cmd=self.command,
                                                                            t=timeout))
            # this is tricky - we need to make sure we kill the process with all its subprocesses;
            #  using just kill might create zombie process waiting for subprocesses to finish
            #  and leaving us hanging on thread.join()
            # TODO: we should do the same for get_command_output!
            killpg(getpgid(self.process.pid), signal.SIGKILL)
            thread.join()
            if not self.error:
                self.error = 'Killed by timeout after {t} seconds'.format(t=timeout)
        if self.output:
            if is_json:
                self.output = json.loads(self.output)
            else:
                self.output = [f for f in self.output.split('\n') if f]

        return self.status, self.output, self.error
pytest_plugin.py 文件源码 项目:wampy 作者: noisyboiler 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def kill_crossbar(try_again=True):
    pids = get_process_ids()
    if pids and try_again is True:
        logger.error(
            "Crossbar.io did not stop when sig term issued!"
        )

    for pid_as_str in pids:
        try:
            pid = os.getpgid(int(pid_as_str))
        except OSError:
            continue

        logger.warning("OS sending SIGTERM to crossbar pid: %s", pid)

        try:
            os.kill(pid, signal.SIGTERM)
        except Exception:  # anything Twisted raises
            logger.exception(
                "Failed to terminate router process: %s", pid
            )

            try:
                os.waitpid(pid, options=os.WNOHANG)
            except OSError:
                pass

            try:
                os.kill(pid, signal.SIGKILL)
            except Exception as exc:
                if "No such process" in str(exc):
                    continue
                logger.exception(
                    "Failed again to terminate router process: %s", pid)

    pids = get_process_ids()
    if pids and try_again is True:
        logger.warning('try one more time to shutdown Crossbar')
        sleep(5)
        kill_crossbar(try_again=False)
    elif pids and try_again is False:
        logger.error("Failed to shutdown all router processes")
twisted.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def terminateProcess(
        pid, done, *, term_after=0.0, quit_after=5.0, kill_after=10.0):
    """Terminate the given process.

    A "sensible" way to terminate a process. Does the following:

      1. Sends SIGTERM to the process identified by `pid`.
      2. Waits for up to 5 seconds.
      3. Sends SIGQUIT to the process *group* of process `pid`.
      4. Waits for up to an additional 5 seconds.
      5. Sends SIGKILL to the process *group* of process `pid`.

    Steps #3 and #5 have a safeguard: if the process identified by `pid` has
    the same process group as the invoking process the signal is sent only to
    the process and not to the process group. This prevents the caller from
    inadvertently killing itself. For best effect, ensure that new processes
    become process group leaders soon after spawning.

    :param pid: The PID to terminate.
    :param done: A `Deferred` that fires when the process exits.
    """
    ppgid = os.getpgrp()

    def kill(sig):
        """Attempt to send `signal` to the given `pid`."""
        try:
            _os_kill(pid, sig)
        except ProcessLookupError:
            pass  # Already exited.

    def killpg(sig):
        """Attempt to send `signal` to the progress group of `pid`.

        If `pid` is running in the same process group as the invoking process,
        this falls back to using kill(2) instead of killpg(2).
        """
        try:
            pgid = os.getpgid(pid)
            if pgid == ppgid:
                _os_kill(pid, sig)
            else:
                _os_killpg(pgid, sig)
        except ProcessLookupError:
            pass  # Already exited.

    killers = (
        reactor.callLater(term_after, kill, signal.SIGTERM),
        reactor.callLater(quit_after, killpg, signal.SIGQUIT),
        reactor.callLater(kill_after, killpg, signal.SIGKILL),
    )

    def ended():
        for killer in killers:
            if killer.active():
                killer.cancel()

    done.addBoth(callOut, ended)


问题


面经


文章

微信
公众号

扫码关注公众号