python类TimeoutExpired()的实例源码

autotest.py 文件源码 项目:cs-autotests 作者: galaharon 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def run(self):
        """Runs the autotest. Diff will be stored in self.diff"""
        process = subprocess.Popen([self.binary] + self.args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)

        for line in self.input:
            process.stdin.write(line.encode())
        try:
            out, err = process.communicate(timeout=self.time_limit)  # TODO test timeout
            if err:
                self.diff = """Encountered error running test:
                Output: {}
                Error output: {}
                """.format(out.decode(), err.decode())
            else:
                self.diff = diff(out.decode(), self.expected)  # TODO make sure I didn't break diff
        except subprocess.TimeoutExpired:
            self.diff = colours['red']('Time limit exceeded.')
pbimport.py 文件源码 项目:protofuzz 作者: trailofbits 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _compile_proto(full_path, dest):
    'Helper to compile protobuf files'
    proto_path = os.path.dirname(full_path)
    protoc_args = [find_protoc(),
                   '--python_out={}'.format(dest),
                   '--proto_path={}'.format(proto_path),
                   full_path]
    proc = subprocess.Popen(protoc_args, stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE)
    try:
        outs, errs = proc.communicate(timeout=5)
    except subprocess.TimeoutExpired:
        proc.kill()
        outs, errs = proc.communicate()
        return False

    if proc.returncode != 0:
        msg = 'Failed compiling "{}": \n\nstderr: {}\nstdout: {}'.format(
            full_path, errs.decode('utf-8'), outs.decode('utf-8'))
        raise BadProtobuf(msg)

    return True
recipe-579056.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def my_thread():
  global files,path,timeout,options
  myname= threading.currentThread().getName()
  while files:
     #create command to run
     nextfile=files.pop() 
     #print name of thread and command being run
     print('Thread {0} starts processing {1}'.format(myname,nextfile))
     f=path + nextfile + options
     try:
        #timeout interrupts frozen command, shell=True does'nt open a console
        subprocess.check_call(args= f , shell=True, timeout=timeout)
     except subprocess.TimeoutExpired:
        print('Thread {0} Processing {0} took too long' .format(myname,nextfile))
     except subprocess.CalledProcessError as e: 
        print ('Thread {0} Processing {1} returned error {2}:{3}'.format(myname,nextfile,e.returncode,e.output))
     except Exception as e:
        print ('Thread {0} Processing {1} returned error {2}'.format(myname,nextfile,type(e).__name__))
  print ('thread {0} stopped'.format(myname))
spct_utils.py 文件源码 项目:specton 作者: somesortoferror 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def runCmd(cmd,cmd_timeout=300):
    ''' run command without showing console window on windows - return stdout and stderr as strings '''
    startupinfo = None
    output = ""
    output_err = ""
    debug_log("runCmd: {}".format(cmd))
    if os.name == 'nt':
        startupinfo = subprocess.STARTUPINFO()
        startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
    try:
        proc = subprocess.Popen(cmd,bufsize=-1,startupinfo=startupinfo,stdout=subprocess.PIPE,stderr=subprocess.PIPE,stdin=None,shell=False,universal_newlines=False)
    except SubprocessError as e:
        proc = None
        debug_log("exception in runCmd: {}".format(e),logging.ERROR)
    if proc is not None:
        try:
            outputb, output_errb = proc.communicate()
            output = outputb.decode('utf-8','replace')
            output_err = output_errb.decode('utf-8','replace')
        except subprocess.TimeoutExpired(timeout=cmd_timeout):
            proc.kill()
            debug_log("runCmd: Process killed due to timeout",logging.WARNING)
    else:
        debug_log("runCmd: Proc was none",logging.WARNING)
    return output,output_err
remote_agent.py 文件源码 项目:ParlAI 作者: facebookresearch 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def shutdown(self):
        """Shut down paired listener with <END> signal."""
        if hasattr(self, 'socket'):
            try:
                self.socket.send_unicode('<END>', zmq.NOBLOCK)
            except zmq.error.ZMQError:
                # may need to listen first
                try:
                    self.socket.recv_unicode(zmq.NOBLOCK)
                    self.socket.send_unicode('<END>', zmq.NOBLOCK)
                except zmq.error.ZMQError:
                    # paired process is probably dead already
                    pass
        if hasattr(self, 'process'):
            # try to let the subprocess clean up, but don't wait too long
            try:
                self.process.communicate(timeout=1)
            except subprocess.TimeoutExpired:
                self.process.kill()
install.py 文件源码 项目:ATX 作者: NetEaseGame 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def adb_pushfile(adb, filepath, remote_path):
    filesize = os.path.getsize(filepath)
    pb = tqdm.tqdm(unit='B', unit_scale=True, total=filesize)
    p = adb.raw_cmd('push', filepath, remote_path)

    while True:
        try:
            p.wait(0.5)
        except subprocess.TimeoutExpired:
            pb.n = get_file_size(adb, remote_path)
            pb.refresh()
            # log.info("Progress %dM/%dM", get_file_size(remote_path) >>20, filesize >>20)
            pass
        except (KeyboardInterrupt, SystemExit):
            p.kill()
            raise
        except:
            raise
        else:
            # log.info("Success pushed into device")
            break
    pb.close()
life360.py 文件源码 项目:smarthome 作者: skalavala 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def exec_shell_command( self, command ):

        output = None
        try:
            output = subprocess.check_output( command, shell=True, timeout=50 )
            output = output.strip().decode('utf-8')

        except subprocess.CalledProcessError:
            """ _LOGGER.error("Command failed: %s", command)"""
            self.value = CONST_STATE_ERROR
            output = None
        except subprocess.TimeoutExpired:
            """ _LOGGER.error("Timeout for command: %s", command)"""
            self.value = CONST_STATE_ERROR
            output = None

        if output == None:
            _LOGGER.error( "Life360 has not responsed well. Nothing to worry, will try again!" )
            self.value = CONST_STATE_ERROR
            return None
        else:
            return output
tarsnap.py 文件源码 项目:snappy 作者: ricci 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def runTarsnap(args, timeout = None):
    command = [config.tarsnap_bin] + config.tarsnap_extra_args + args
    proc = subprocess.Popen(command,
                stdout = subprocess.PIPE, stderr = subprocess.PIPE,
                stdin = subprocess.DEVNULL, universal_newlines = True)
    result = CheapoCompletedProcess()
    try:
        result.stdout, result.stderr = proc.communicate(timeout = timeout)
        result.returncode = proc.wait()
        if result.returncode:
            sys.exit("Error running tarsnap:\nCommand: {}\nSTDOUT:\n{}\nSTDERR:\n{}\n".format(" ".join(command),result.stdout,result.stderr))
        return result
    except subprocess.TimeoutExpired:
        print("Tarsnap timed out, sending SIGQUIT...")
        proc.send_signal(signal.SIGQUIT)
        result.stdout, result.stderr = proc.communicate()
        result.returncode = proc.wait()
        print("Tarsnap finished")
        if result.returncode:
            sys.exit("Error running tarsnap:\nCommand: {}\nSTDOUT:\n{}\nSTDERR:\n{}\n".format(" ".join(command),result.stdout,result.stderr))
        return result
eval.py 文件源码 项目:kitsuchan-2 作者: n303p4 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def sh(self, ctx, *, command):
        """Execute a system command. Bot owner only."""
        command = command.split(" ")
        process = subprocess.Popen(command,
                                   universal_newlines=True,
                                   stdout=subprocess.PIPE,
                                   stderr=subprocess.PIPE)
        try:
            output, errors = process.communicate(timeout=8)
            output = output.split("\n")
            process.terminate()
        except subprocess.TimeoutExpired:
            process.kill()
            output = ["Command timed out. x.x"]
        paginator = commands.Paginator(prefix="```bash")
        for line in output:
            paginator.add_line(line)
        for page in paginator.pages:
            await ctx.send(page)
npm.py 文件源码 项目:bridge-test 作者: Half-Shot 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def install(self, path):
        logger.info("Installing npm packages...")
        process = subprocess.Popen(
            ["npm", "install"],
            cwd=path,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )
        try:
            return_code = process.wait()
        except subprocess.TimeoutExpired:
            return True
        if return_code is not 0:
            raise Exception("Return code was non-zero")
        logger.info("Done.")
        return True
npm.py 文件源码 项目:bridge-test 作者: Half-Shot 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __read_process_stream(self, proc, kill_after=None):
        timedOut = False
        outS = ""
        errS = ""
        try:
            outs, errs = proc.communicate(timeout=kill_after)
            outS = outs.decode()
            errS = errs.decode()
        except subprocess.TimeoutExpired as e:
            proc.terminate()
            proc.wait()
            if e.stdout is not None:
                outS = e.stdout.decode()
            if e.stderr is not None:
                errS = e.stderr.decode()
            timedOut = True
        logger.debug("%s was terminated", str(" ".join(proc.args)))
        return (outS, errS, timedOut)
rofi.py 文件源码 项目:rofi-skyss 作者: torrottum 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def close(self):
        """Close any open window.

        Note that this only works with non-blocking methods.

        """
        if self._process:
            # Be nice first.
            self._process.send_signal(signal.SIGINT)

            # If it doesn't close itself promptly, be brutal.
            try:
                self._process.wait(timeout=1)
            except subprocess.TimeoutExpired:
                self._process.send_signal(signal.SIGKILL)

            # Clean up.
            self._process = None
git.py 文件源码 项目:marge-bot 作者: smarkets 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _run(*args, env=None, check=False, timeout=None):
    with subprocess.Popen([a.encode('utf-8') for a in args], env=env, stdout=PIPE, stderr=PIPE) as process:
        try:
            stdout, stderr = process.communicate(input, timeout=timeout)
        except TimeoutExpired:
            process.kill()
            stdout, stderr = process.communicate()
            raise TimeoutExpired(
                process.args, timeout, output=stdout, stderr=stderr,
            )
        except:
            process.kill()
            process.wait()
            raise
        retcode = process.poll()
        if check and retcode:
            raise subprocess.CalledProcessError(
                retcode, process.args, output=stdout, stderr=stderr,
            )
        return subprocess.CompletedProcess(process.args, retcode, stdout, stderr)
resume.py 文件源码 项目:tensorflow-layer-library 作者: bioinf-jku 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def sigint_handler(sig, frame):
    print("Killing sub-process...")
    if process_handle is not None:
        global kill_retry_count
        while process_handle.returncode is None and kill_retry_count < kill_retry_max:
            kill_retry_count += 1
            print("Killing sub-process ({})...".format(kill_retry_count))
            try:
                os.killpg(os.getpgid(process_handle.pid), signal.SIGTERM)
                os.waitpid(process_handle.pid, os.WNOHANG)
            except ProcessLookupError:
                break

            try:
                process_handle.wait(1)
            except subprocess.TimeoutExpired:
                pass

    if working_dir is not None:
        rmdir(working_dir)

    sys.exit(0)
utils.py 文件源码 项目:donkey 作者: wroscoe 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def run_shell_command(cmd, cwd=None, timeout=15):
    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd)
    out = []
    err = []

    try:
        proc.wait(timeout=timeout)
    except subprocess.TimeoutExpired:
        kill(proc.pid)

    for line in proc.stdout.readlines():
        out.append(line.decode())

    for line in proc.stderr.readlines():
        err.append(line)
    return out, err, proc.pid
start_controller.py 文件源码 项目:parsl 作者: Parsl 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def close(self):
        ''' Terminate the controller process and it's child processes.

        Args:
              - None
        '''
        if self.reuse :
            logger.debug("Ipcontroller not shutting down: reuse enabled")
            return

        try:
            pgid = os.getpgid(self.proc.pid)
            status = os.killpg(pgid, signal.SIGTERM)
            time.sleep(0.2)
            os.killpg(pgid, signal.SIGKILL)
            try:
                self.proc.wait(timeout=1)
                x = self.proc.returncode
                logger.debug("Controller exited with {0}".format(x))
            except subprocess.TimeoutExpired :
                logger.warn("Ipcontroller process:{0} cleanup failed. May require manual cleanup".format(self.proc.pid))

        except Exception as e:
            logger.warn("Failed to kill the ipcontroller process[{0}]: {1}".format(self.proc.pid,
                                                                                   e))
utils.py 文件源码 项目:mobly 作者: google 项目源码 文件源码 阅读 47 收藏 0 点赞 0 评论 0
def wait_for_standing_subprocess(proc, timeout=None):
    """Waits for a subprocess started by start_standing_subprocess to finish
    or times out.

    Propagates the exception raised by the subprocess.wait(.) function.
    The subprocess.TimeoutExpired exception is raised if the process timed-out
    rather then terminating.

    If no exception is raised: the subprocess terminated on its own. No need
    to call stop_standing_subprocess() to kill it.

    If an exception is raised: the subprocess is still alive - it did not
    terminate. Either call stop_standing_subprocess() to kill it, or call
    wait_for_standing_subprocess() to keep waiting for it to terminate on its
    own.

    Args:
        p: Subprocess to wait for.
        timeout: An integer number of seconds to wait before timing out.
    """
    proc.wait(timeout)
shell.py 文件源码 项目:MUBench 作者: stg-tud 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def exec(command: str, cwd: str = os.getcwd(), logger=logging.getLogger(__name__), timeout: Optional[int] = None):
        logger.debug("Execute '%s' in '%s'", command, cwd)
        encoding = Shell.__get_encoding()
        try:
            # On our Debian server subprocess does not return until after the process finished, but then correctly
            # raises TimeoutExpired, if the process took to long. We use `timeout` to ensure that the process terminates
            # eventually.
            if "Linux" in platform() and timeout is not None:
                command = "timeout {} {}".format(timeout + 60, command)

            output = Shell.__exec(command, cwd, timeout, encoding)
            logger.debug(output)
            return output
        except CalledProcessError as e:
            raise CommandFailedError(e.cmd, e.output.decode(encoding), e.stderr.decode(encoding))
        except TimeoutExpired as e:
            raise TimeoutError(e.cmd, e.output.decode(encoding))
test_subprocess.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_communicate_timeout(self):
        p = subprocess.Popen([sys.executable, "-c",
                              'import sys,os,time;'
                              'sys.stderr.write("pineapple\\n");'
                              'time.sleep(1);'
                              'sys.stderr.write("pear\\n");'
                              'sys.stdout.write(sys.stdin.read())'],
                             universal_newlines=True,
                             stdin=subprocess.PIPE,
                             stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE)
        self.assertRaises(subprocess.TimeoutExpired, p.communicate, "banana",
                          timeout=0.3)
        # Make sure we can keep waiting for it, and that we get the whole output
        # after it completes.
        (stdout, stderr) = p.communicate()
        self.assertEqual(stdout, "banana")
        self.assertStderrEqual(stderr.encode(), b"pineapple\npear\n")
test_subprocess.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_communicate_timeout_large_ouput(self):
        # Test an expiring timeout while the child is outputting lots of data.
        p = subprocess.Popen([sys.executable, "-c",
                              'import sys,os,time;'
                              'sys.stdout.write("a" * (64 * 1024));'
                              'time.sleep(0.2);'
                              'sys.stdout.write("a" * (64 * 1024));'
                              'time.sleep(0.2);'
                              'sys.stdout.write("a" * (64 * 1024));'
                              'time.sleep(0.2);'
                              'sys.stdout.write("a" * (64 * 1024));'],
                             stdout=subprocess.PIPE)
        self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4)
        (stdout, _) = p.communicate()
        self.assertEqual(len(stdout), 4 * 64 * 1024)

    # Test for the fd leak reported in http://bugs.python.org/issue2791.
utils.py 文件源码 项目:online-judge-tools 作者: kmyk 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def exec_command(command, timeout=None, **kwargs):
    try:
        proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=sys.stderr, **kwargs)
    except FileNotFoundError:
        log.error('No such file or directory: %s', command)
        sys.exit(1)
    except PermissionError:
        log.error('Permission denied: %s', command)
        sys.exit(1)
    try:
        answer, _ = proc.communicate(timeout=timeout)
    except subprocess.TimeoutExpired:
        answer = b''
    return answer, proc

# We should use this instead of posixpath.normpath
# posixpath.normpath doesn't collapse a leading duplicated slashes. see: https://stackoverflow.com/questions/7816818/why-doesnt-os-normpath-collapse-a-leading-double-slash
xena.py 文件源码 项目:vswitchperf 作者: opnfv 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def _wait_xena_2544_complete(self):
        """
        Wait for Xena2544.exe completion.
        :return: None
        """
        data = ''
        while True:
            try:
                self.mono_pipe.wait(60)
                self._log_handle.close()
                break
            except subprocess.TimeoutExpired:
                # check the log to see if Xena2544 has completed and mono is
                # deadlocked.
                data += self._log_handle.read()
                if 'TestCompletedSuccessfully' in data:
                    self._log_handle.close()
                    self.mono_pipe.terminate()
                    break
updatableProcess.py 文件源码 项目:wifimitm 作者: mvondracek 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def stop(self):
        """
        Stop process if it's running.
        """
        if self.cleaned:
            raise ValueError("Can't call stop on process after cleanup was performed.")
        if self.poll() is None:
            # process is running
            self.terminate()
            try:
                logger.debug('Waiting for {} process to terminate.'.format(type(self).__name__))
                self.wait(timeout=10)
            except subprocess.TimeoutExpired:
                self.kill()
                logger.warning('Process {} killed after unsuccessful termination.'.format(type(self).__name__))
            else:
                logger.debug('Process {} terminated.'.format(type(self).__name__))

        self.update()
core.py 文件源码 项目:GAFBot 作者: DiNitride 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def update(self, ctx):
        """
        Updates the bot from the Github repo
        """
        await ctx.send("Calling process to update! :up: :date: ")
        try:
            done = subprocess.run("git pull", shell=True, stdout=subprocess.PIPE, timeout=30)
            if done:
                message = done.stdout.decode()
                await ctx.send("`{}`".format(message))
                if message == "Already up-to-date.\n":
                    await ctx.send("No update available :no_entry:")
                else:
                    await ctx.send("Succesfully updated! Rebooting now :repeat: ")
                    await self.bot.logout()
        except subprocess.CalledProcessError:
            await ctx.send("Error updating! :exclamation: ")
        except subprocess.TimeoutExpired:
            await ctx.send("Error updating - Process timed out! :exclamation: ")
vm.py 文件源码 项目:build 作者: freenas 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def do_install():
    info('Starting up VM for unattended install')
    vm_proc = sh_spawn(
        'bhyve -m ${MEMSIZE} -c ${CORES} -A -H -P',
        '-s 3:0,ahci-hd,${destdir}/boot.img',
        '-s 4:0,ahci-hd,${destdir}/hd1.img',
        '-s 5:0,ahci-hd,${destdir}/hd2.img',
        '-s 6:0,ahci-cd,${isopath}',
        '-s 7:0,virtio-net,${tapdev}',
        '-s 8:0,fbuf,tcp=5900,w=1024,h=768',
        '-s 31,lpc',
        '-l bootrom,/usr/local/share/uefi-firmware/BHYVE_UEFI.fd',
        '${VM_NAME}'
    )

    try:
        vm_proc.wait(timeout=3600)
    except subprocess.TimeoutExpired:
        fail('Install timed out after 1 hour')
perf_fs.py 文件源码 项目:tfhfs 作者: fingon 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def close_fs(t):
    import test_fs

    (p, args) = t
    args = test_fs.argument_parser().parse_args(args)
    try:
        subprocess.call(['umount', args.mountpoint], stderr=subprocess.DEVNULL)
    except:
        pass
    try:
        rc = p.wait(timeout=3)
    except subprocess.TimeoutExpired:
        p.terminate()
        try:
            rc = p.wait(timeout=3)
        except subprocess.TimeoutExpired:
            p.kill()
            p.wait()
eos_starter_lib.py 文件源码 项目:thesis_scripts 作者: PhilippKopp 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def run(cmd, timeout_sec):

    if (sys.version_info >= (3,5)):
        try:
            completed = subprocess.run(shlex.split(cmd), timeout=timeout_sec, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            return completed.stdout.decode('utf-8'), completed.stderr.decode('utf-8')
        except subprocess.TimeoutExpired:
            message = 'Fitting got killed by timeout after '+str(timeout_sec)+' sec!'
            print (message)
            raise EslException(message)
    else:
        proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        kill_proc = lambda p: p.kill()
        timer = Timer(timeout_sec, kill_proc, [proc])

        try:
            timer.start()
            stdout,stderr = proc.communicate()
        finally:
            timer.cancel()
            if (proc.poll() == -9 ):
                raise EslException('Fitting probably got killed by timeout!')
            #if (proc.poll() != 0 ):
            #   raise EslException('Fitting crashed! returned '+str(proc.poll()))
            return stdout, stderr
test_subprocess.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_communicate_timeout(self):
        p = subprocess.Popen([sys.executable, "-c",
                              'import sys,os,time;'
                              'sys.stderr.write("pineapple\\n");'
                              'time.sleep(1);'
                              'sys.stderr.write("pear\\n");'
                              'sys.stdout.write(sys.stdin.read())'],
                             universal_newlines=True,
                             stdin=subprocess.PIPE,
                             stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE)
        self.assertRaises(subprocess.TimeoutExpired, p.communicate, "banana",
                          timeout=0.3)
        # Make sure we can keep waiting for it, and that we get the whole output
        # after it completes.
        (stdout, stderr) = p.communicate()
        self.assertEqual(stdout, "banana")
        self.assertStderrEqual(stderr.encode(), b"pineapple\npear\n")
test_subprocess.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_communicate_timeout_large_ouput(self):
        # Test an expiring timeout while the child is outputting lots of data.
        p = subprocess.Popen([sys.executable, "-c",
                              'import sys,os,time;'
                              'sys.stdout.write("a" * (64 * 1024));'
                              'time.sleep(0.2);'
                              'sys.stdout.write("a" * (64 * 1024));'
                              'time.sleep(0.2);'
                              'sys.stdout.write("a" * (64 * 1024));'
                              'time.sleep(0.2);'
                              'sys.stdout.write("a" * (64 * 1024));'],
                             stdout=subprocess.PIPE)
        self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4)
        (stdout, _) = p.communicate()
        self.assertEqual(len(stdout), 4 * 64 * 1024)

    # Test for the fd leak reported in http://bugs.python.org/issue2791.
utils.py 文件源码 项目:simple-ostinato 作者: little-dude 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def kill_drone():
    global DRONE_RUNNING
    LOG.info('stopping drone')
    if DRONE_RUNNING is False:
        LOG.warning('drone is not running, nothing to do')
        return
    LOG.info('trying to stop drone gracefully')
    DRONE.terminate()
    try:
        DRONE.wait(timeout=10)
        LOG.info('drone exited gracefully')
    except subprocess.TimeoutExpired:
        LOG.info('could not terminate drone properly, kill it.')
        DRONE.kill()
        DRONE.wait(timeout=10)
        LOG.info('drone has been killed')
    DRONE_RUNNING = False


问题


面经


文章

微信
公众号

扫码关注公众号