python类execvp()的实例源码

cmd.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def run_bg(cmd, debug=False, cwd=''):
    # make sure 'cmd' is a list of strings
    if type(cmd) in [str, unicode]:
        cmd = [c for c in cmd.split() if c != '']
    if debug:
        sys.stderr.write(' '.join(cmd)+'\n')
        sys.stderr.flush()

    try:
        ( child_pid, child_fd ) = pty.fork()
    except OSError as e:
        raise RunError(cmd, None, message='pty.fork() failed: %s' % str(e))
    if child_pid == 0:
        try:
            if cwd != '':
                os.chdir(cwd)
            os.execvp(cmd[0], cmd)
        except Exception, e:
            raise RunError(cmd, None, 'os.execvp() failed: %s' % str(e))
    else:
        return child_pid, child_fd
cmd.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def run_bg(cmd, debug=False, cwd=''):
    # make sure 'cmd' is a list of strings
    if type(cmd) in [str, unicode]:
        cmd = [c for c in cmd.split() if c != '']
    if debug:
        sys.stderr.write(' '.join(cmd)+'\n')
        sys.stderr.flush()

    try:
        ( child_pid, child_fd ) = pty.fork()
    except OSError as e:
        raise RunError(cmd, None, message='pty.fork() failed: %s' % str(e))
    if child_pid == 0:
        try:
            if cwd != '':
                os.chdir(cwd)
            os.execvp(cmd[0], cmd)
        except Exception, e:
            raise RunError(cmd, None, 'os.execvp() failed: %s' % str(e))
    else:
        return child_pid, child_fd
manage.py 文件源码 项目:circleci-demo-python-flask 作者: CircleCI-Public 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test(coverage=False):
    """Run the unit tests."""
    if coverage and not os.environ.get('FLASK_COVERAGE'):
        import sys
        os.environ['FLASK_COVERAGE'] = '1'
        os.execvp(sys.executable, [sys.executable] + sys.argv)
    import unittest
    import xmlrunner
    tests = unittest.TestLoader().discover('tests')
    # run tests with unittest-xml-reporting and output to $CIRCLE_TEST_REPORTS on CircleCI or test-reports locally
    xmlrunner.XMLTestRunner(output=os.environ.get('CIRCLE_TEST_REPORTS','test-reports')).run(tests)
    if COV:
        COV.stop()
        COV.save()
        print('Coverage Summary:')
        COV.report()
        basedir = os.path.abspath(os.path.dirname(__file__))
        covdir = os.path.join(basedir, 'tmp/coverage')
        COV.html_report(directory=covdir)
        print('HTML version: file://%s/index.html' % covdir)
        COV.erase()
rd.py 文件源码 项目:rubber-docker 作者: Fewbytes 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def contain(command, image_name, image_dir, container_id, container_dir):
    linux.unshare(linux.CLONE_NEWNS)  # create a new mount namespace

    linux.mount(None, '/', None, linux.MS_PRIVATE | linux.MS_REC, None)

    new_root = create_container_root(
        image_name, image_dir, container_id, container_dir)
    print('Created a new root fs for our container: {}'.format(new_root))

    _create_mounts(new_root)

    old_root = os.path.join(new_root, 'old_root')
    os.makedirs(old_root)
    linux.pivot_root(new_root, old_root)

    os.chdir('/')

    linux.umount2('/old_root', linux.MNT_DETACH)  # umount old root
    os.rmdir('/old_root') # rmdir the old_root dir
    os.execvp(command[0], command)
rd.py 文件源码 项目:rubber-docker 作者: Fewbytes 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def contain(command, image_name, image_dir, container_id, container_dir):
    linux.unshare(linux.CLONE_NEWNS)  # create a new mount namespace
    # TODO: switch to a new UTS namespace, change hostname to container_id
    # HINT: use linux.sethostname()

    linux.mount(None, '/', None, linux.MS_PRIVATE | linux.MS_REC, None)

    new_root = create_container_root(
        image_name, image_dir, container_id, container_dir)
    print('Created a new root fs for our container: {}'.format(new_root))

    _create_mounts(new_root)

    old_root = os.path.join(new_root, 'old_root')
    os.makedirs(old_root)
    linux.pivot_root(new_root, old_root)

    os.chdir('/')

    linux.umount2('/old_root', linux.MNT_DETACH)  # umount old root
    os.rmdir('/old_root') # rmdir the old_root dir

    os.execvp(command[0], command)
rd.py 文件源码 项目:rubber-docker 作者: Fewbytes 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def contain(command, image_name, image_dir, container_id, container_dir):
    linux.sethostname(container_id)  # change hostname to container_id

    linux.mount(None, '/', None, linux.MS_PRIVATE | linux.MS_REC, None)

    new_root = create_container_root(
        image_name, image_dir, container_id, container_dir)
    print('Created a new root fs for our container: {}'.format(new_root))

    _create_mounts(new_root)

    old_root = os.path.join(new_root, 'old_root')
    os.makedirs(old_root)
    linux.pivot_root(new_root, old_root)

    os.chdir('/')

    linux.umount2('/old_root', linux.MNT_DETACH)  # umount old root
    os.rmdir('/old_root') # rmdir the old_root dir

    os.execvp(command[0], command)
rd.py 文件源码 项目:rubber-docker 作者: Fewbytes 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def contain(command, image_name, image_dir, container_id, container_dir):
    linux.unshare(linux.CLONE_NEWNS)  # create a new mount namespace
    linux.unshare(linux.CLONE_NEWUTS)  # switch to a new UTS namespace
    linux.sethostname(container_id)  # change hostname to container_id

    linux.mount(None, '/', None, linux.MS_PRIVATE | linux.MS_REC, None)

    new_root = create_container_root(
        image_name, image_dir, container_id, container_dir)
    print('Created a new root fs for our container: {}'.format(new_root))

    _create_mounts(new_root)

    old_root = os.path.join(new_root, 'old_root')
    os.makedirs(old_root)
    linux.pivot_root(new_root, old_root)

    os.chdir('/')

    linux.umount2('/old_root', linux.MNT_DETACH)  # umount old root
    os.rmdir('/old_root') # rmdir the old_root dir

    os.execvp(command[0], command)
master.py 文件源码 项目:mitogen 作者: dw 项目源码 文件源码 阅读 50 收藏 0 点赞 0 评论 0
def create_child(*args):
    parentfp, childfp = socket.socketpair()
    pid = os.fork()
    if not pid:
        mitogen.core.set_block(childfp.fileno())
        os.dup2(childfp.fileno(), 0)
        os.dup2(childfp.fileno(), 1)
        childfp.close()
        parentfp.close()
        os.execvp(args[0], args)

    childfp.close()
    # Decouple the socket from the lifetime of the Python socket object.
    fd = os.dup(parentfp.fileno())
    parentfp.close()

    LOG.debug('create_child() child %d fd %d, parent %d, cmd: %s',
              pid, fd, os.getpid(), Argv(args))
    return pid, fd
master.py 文件源码 项目:mitogen 作者: dw 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def tty_create_child(*args):
    master_fd, slave_fd = os.openpty()
    disable_echo(master_fd)
    disable_echo(slave_fd)

    pid = os.fork()
    if not pid:
        mitogen.core.set_block(slave_fd)
        os.dup2(slave_fd, 0)
        os.dup2(slave_fd, 1)
        os.dup2(slave_fd, 2)
        close_nonstandard_fds()
        os.setsid()
        os.close(os.open(os.ttyname(1), os.O_RDWR))
        os.execvp(args[0], args)

    os.close(slave_fd)
    LOG.debug('tty_create_child() child %d fd %d, parent %d, cmd: %s',
              pid, master_fd, os.getpid(), Argv(args))
    return pid, master_fd
manage.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test(coverage=False):
    """Run the unit tests."""
    if coverage and not os.environ.get('FLASK_COVERAGE'):
        import sys
        os.environ['FLASK_COVERAGE'] = '1'
        os.execvp(sys.executable, [sys.executable] + sys.argv)
    import unittest
    tests = unittest.TestLoader().discover('tests')
    unittest.TextTestRunner(verbosity=2).run(tests)
    if COV:
        COV.stop()
        COV.save()
        print('Coverage Summary:')
        COV.report()
        basedir = os.path.abspath(os.path.dirname(__file__))
        covdir = os.path.join(basedir, 'tmp/coverage')
        COV.html_report(directory=covdir)
        print('HTML version: file://%s/index.html' % covdir)
        COV.erase()
play.py 文件源码 项目:witchcraft 作者: llllllllll 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def play(conn, query):
    """Launch mpv with the results of the query.

    Parameters
    ----------
    conn : sa.engine.Connection
        The connection to the metadata database.
    query : string
        The witchcraft ql query to run against the database.

    Notes
    -----
    This function never returns.
    """
    select, extra_args = ql.compile(query)
    paths = [p[0] for p in conn.execute(select).fetchall()]

    if not paths:
        # nothing to play, mpv doesn't want an empty path list
        return

    os.execvp('mpv', ['mpv', '--no-video'] + extra_args + paths)
manage.py 文件源码 项目:Plog 作者: thundernet8 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test(coverage=False):
    """
    ??
    :param coverage ????????
    :return:
    """
    if coverage and not os.environ.get('PG_COVERAGE'):
        import sys
        os.environ['PG_COVERAGE'] = '1'
        os.execvp(sys.executable, [sys.executable] + sys.argv)
    import unittest
    tests = unittest.TestLoader().discover('tests')
    unittest.TextTestRunner(verbosity=2).run(tests)
    if COV:
        COV.stop()
        COV.save()
        print('Coverage Summary:')
        COV.report()
        basedir = os.path.abspath(os.path.dirname(__file__))
        covdir = os.path.join(basedir, 'tests/coverage')
        COV.html_report(directory=covdir)
        print('HTML version: file://%s/index.html' % covdir)
        COV.erase()
manage.py 文件源码 项目:PilosusBot 作者: pilosus 项目源码 文件源码 阅读 83 收藏 0 点赞 0 评论 0
def test(coverage=False):
    """Run the unit tests."""
    if coverage and not os.environ.get('FLASK_COVERAGE'):
        import sys
        os.environ['FLASK_COVERAGE'] = '1'
        os.execvp(sys.executable, [sys.executable] + sys.argv)
    import unittest
    tests = unittest.TestLoader().discover('tests')
    unittest.TextTestRunner(verbosity=2).run(tests)
    if COV:
        COV.stop()
        COV.save()
        print('Coverage Summary:')
        COV.report()
        basedir = os.path.abspath(os.path.dirname(__file__))
        covdir = os.path.join(basedir, 'tmp/coverage')
        COV.html_report(directory=covdir)
        print('HTML version: file://%s/index.html' % covdir)
        COV.erase()
admin.py 文件源码 项目:fritzchecksum 作者: mementum 项目源码 文件源码 阅读 68 收藏 0 点赞 0 评论 0
def RunAsAdminUnixLike():
    # FIXME ... what happens if "frozen"
    script = os.path.abspath(sys.argv[0])

    params = [sys.executable]
    if sys.executable != script:
        params.append(script)
        # CHECK Seems logic to always extend
        params.extend(sys.argv[1:])

    # FIXME ... execute the command directly
    try:
        os.execvp('sudo', params)
    except Exception, e:
        return False, str(e)

    # If execvp was ok ... this will never be reached
    return True, None
disco.py 文件源码 项目:layercake 作者: bcsaller 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def main():
    loop = asyncio.get_event_loop()
    loop.set_debug(False)
    options = setup()
    configure_logging(options.log_level)
    config = configure_from_file(options.conf)
    config.update(configure_from_env())
    r = reactive.Reactive(config, loop=loop)
    r.find_rules()
    r.find_schemas()
    try:
        config_task = loop.create_task(r())
        loop.run_until_complete(config_task)
        if config_task.result() is True:
            # Fork/Exec cmd
            log.info("Container Configured")
            log.info("Exec {}".format(options.cmd))
            os.execvp(options.cmd[0], options.cmd)
        else:
            log.critical("Unable to configure container, see log or run with -l DEBUG")
    finally:
        loop.close()
manage.py 文件源码 项目:SweetHeart 作者: haojunyu 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test(coverage=False):
  # run the test unit
  if coverage and not os.environ.get('FLASK_COVERAGE'):
    import sys
    os.environ['FLASK_COVERAGE'] = '1'
    os.execvp(sys.executable, [sys.executable] + sys.argv)
  import unittest
  tests = unittest.TestLoader().discover('tests')
  unittest.TextTestRunner(verbosity=2).run(tests)
  if COV:
    COV.stop()
    COV.save()
    print('Coverage Summary:')
    COV.report()
    basedir = os.path.abspath(os.path.dirname(__file__))
    covdir = os.path.join(basedir, 'tmp/coverage')
    COV.html_report(directory=covdir)
    print('HTML version: file://%s/index.html' % covdir)
    COV.erase()
osal.py 文件源码 项目:device-cloud-python 作者: Wind-River 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def execl(*args):
    """
    Replaces the current process with a new instance of the specified
    executable. This function will only return if there is an issue starting the
    new instance, in which case it will return false. Otherwise, it will not
    return.
    """
    retval = EXECUTION_FAILURE

    if POSIX:
        os.execvp(args[0], args)
    elif WIN32:
        os.execvp(sys.executable, args)
    else:
        retval = NOT_SUPPORTED

    return retval
kalipi.py 文件源码 项目:Kali-Pi 作者: Re4son 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def screen_on(retPage, backlightControl):

    if os.environ["KPPIN"] != "1":
        launch_bg=os.environ["MENUDIR"] + "launch-bg.sh"
        process = subprocess.call(launch_bg, shell=True)

    pygame.quit()
    if backlightControl == "3.5r":
        process = subprocess.call("echo '1' > /sys/class/backlight/soc\:backlight/brightness", shell=True)
    elif backlightControl == "4dpi-24":
        process = subprocess.call("echo '80' > /sys/class/backlight/24-hat-pwm/brightness", shell=True)
    else:
        backlight = GPIO.PWM(18, 1023)
        backlight.start(100)
        GPIO.cleanup()
    if os.environ["KPPIN"] == "1":
        page=os.environ["MENUDIR"] + "menu-pin.py"
        args = [page, retPage]
    else:
        page=os.environ["MENUDIR"] + retPage
        args = [page]

    os.execvp("python", ["python"] + args)

# Turn screen off
utils.py 文件源码 项目:uwsgiconf 作者: idlesign 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def spawn(self, filepath, configuration_alias, replace=False):
        """Spawns uWSGI using the given configuration module.

        :param str|unicode filepath:

        :param str|unicode configuration_alias:

        :param bool replace: Whether a new process should replace current one.

        """
        # Pass --conf as an argument to have a chance to use
        # touch reloading form .py configuration file change.
        args = ['uwsgi', '--ini', 'exec://%s %s --conf %s' % (self.binary_python, filepath, configuration_alias)]

        if replace:
            return os.execvp('uwsgi', args)

        return os.spawnvp(os.P_NOWAIT, 'uwsgi', args)
cmd_tests.py 文件源码 项目:flask-api-boilerplate 作者: mikaelm1 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def cli(cov):
    """
    Runs all the tests.
    :param cov: bool Set to True to get coverage report
    """
    if cov and not os.environ.get('FLASK_COVERAGE'):
        import sys
        os.environ['FLASK_COVERAGE'] = '1'
        os.execvp(sys.executable, [sys.executable] + sys.argv)
    tests = unittest.TestLoader().discover('app.tests')
    unittest.TextTestRunner(verbosity=2).run(tests)
    if COV:
        COV.stop()
        COV.save()
        print('Coverage Summary:')
        COV.report()
        basedir = os.path.abspath(os.path.dirname('__file__'))
        covdir = os.path.join(basedir, 'tmp/coverage')
        COV.html_report(directory=covdir)
        print('HTML version: file://%s/index.html' % covdir)
        COV.erase()
ssh.py 文件源码 项目:aegea 作者: kislyuk 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def ssh(args):
    prefix, at, name = args.name.rpartition("@")
    instance = resources.ec2.Instance(resolve_instance_id(name))
    if not instance.public_dns_name:
        msg = "Unable to resolve public DNS name for {} (state: {})"
        raise AegeaException(msg.format(instance, getattr(instance, "state", {}).get("Name")))

    tags = {tag["Key"]: tag["Value"] for tag in instance.tags or []}
    ssh_host_key = tags.get("SSHHostPublicKeyPart1", "") + tags.get("SSHHostPublicKeyPart2", "")
    if ssh_host_key:
        # FIXME: this results in duplicates.
        # Use paramiko to detect if the key is already listed and not insert it then (or only insert if different)
        add_ssh_host_key_to_known_hosts(instance.public_dns_name + " " + ssh_host_key + "\n")
    ssh_args = ["ssh", prefix + at + instance.public_dns_name]
    if not (prefix or at):
        try:
            ssh_args += ["-l", resources.iam.CurrentUser().user.name]
        except:
            logger.info("Unable to determine IAM username, using local username")
    os.execvp("ssh", ssh_args + args.ssh_args)
manage.py 文件源码 项目:url-shortener-api 作者: mabbie 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test(coverage=False):
    """Runs the unit tests."""
    if coverage and not os.environ.get('FLASK_COVERAGE'):
        import sys
        os.environ['FLASK_COVERAGE'] = '1'
        os.execvp(sys.executable, [sys.executable] + sys.argv)
    import unittest
    tests = unittest.TestLoader().discover('tests')
    unittest.TextTestRunner(verbosity=2).run(tests)
    if COV:
        COV.stop()
        COV.save()
        print('Coverage Summary:')
        COV.report()
        basedir = os.path.abspath(os.path.dirname(__file__))
        covdir = os.path.join(basedir, 'tmp/coverage')
        COV.html_report(directory=covdir)
        print('HTML version: file://%s/index.html' % covdir)
        COV.erase()
ssh.py 文件源码 项目:treadmill 作者: Morgan-Stanley 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _wait_for_ssh(queue, ssh, command, timeout=1, attempts=40):
    """Wait until a successful connection to the ssh endpoint can be made."""
    try:
        host, port = queue.get(timeout=timeout * attempts)
    except g_queue.Empty:
        cli.bad_exit("No SSH endpoint found.")

    for _ in six.moves.range(attempts):
        _LOGGER.debug('Checking SSH endpoint %s:%s', host, port)
        if checkout.connect(host, port):
            run_ssh(host, port, ssh, list(command))
            break  # if run_ssh doesn't end with os.execvp()...

        try:
            host, port = queue.get(timeout=timeout)
            queue.task_done()
        except g_queue.Empty:
            pass

    # Either all the connection attempts failed or we're after run_ssh
    # (not resulting in os.execvp) so let's "clear the queue" so the thread
    # can join
    queue.task_done()
utils.py 文件源码 项目:treadmill 作者: Morgan-Stanley 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def sane_execvp(filename, args, close_fds=True, restore_signals=True):
    """Execute a new program with sanitized environment.
    """
    def _restore_signals():
        """Reset the default behavior to all signals.
        """
        for i in _SIGNALS:
            signal.signal(i, signal.SIG_DFL)

    def _close_fds():
        """Close all file descriptors except 0, 1, 2.
        """
        os.closerange(3, subprocess.MAXFD)

    if close_fds:
        _close_fds()
    if restore_signals:
        _restore_signals()
    os.execvp(filename, args)
manage.py 文件源码 项目:cci-demo-flask 作者: circleci 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test(coverage=False):
    """Run the unit tests."""
    import sys
    if coverage and not os.environ.get('FLASK_COVERAGE'):
        os.environ['FLASK_COVERAGE'] = '1'
        os.execvp(sys.executable, [sys.executable] + sys.argv)
    import unittest
    import xmlrunner
    tests = unittest.TestLoader().discover('tests')
    results = xmlrunner.XMLTestRunner(output='test-reports').run(tests)
    if COV:
        COV.stop()
        COV.save()
        print('Coverage Summary:')
        COV.report()
        basedir = os.path.abspath(os.path.dirname(__file__))
        covdir = os.path.join(basedir, 'test-reports/coverage')
        COV.html_report(directory=covdir)
        print('HTML version: file://%s/index.html' % covdir)
        COV.erase()
    if (len(results.failures) > 0 or len(results.errors) > 0):
        sys.exit(1)
workers.py 文件源码 项目:jack 作者: jack-cli-cd-ripper 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def start_new_process(args, nice_value=0):
    "start a new process in a pty and renice it"
    data = {}
    data['start_time'] = time.time()
    pid, master_fd = pty.fork()
    if pid == CHILD:
        default_signals()
        if nice_value:
            os.nice(nice_value)
        os.execvp(args[0], [a.encode(cf['_charset'], "replace") for a in args])
    else:
        data['pid'] = pid
        if os.uname()[0] == "Linux":
            fcntl.fcntl(master_fd, F_SETFL, O_NONBLOCK)
        data['fd'] = master_fd
        data['file'] = os.fdopen(master_fd)
        data['cmd'] = args
        data['buf'] = ""
        data['otf'] = 0
        data['percent'] = 0
        data['elapsed'] = 0
        return data
manage.py 文件源码 项目:project 作者: Junctionzc 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test(coverage = False):
    """Run the unit tests"""
    if coverage and not os.environ.get('FLASK_COVERAGE'):
        import sys
        os.environ['FLASK_COVERAGE'] = '1'
        os.execvp(sys.executable, [sys.executable] + sys.argv)
    import unittest
    tests = unittest.TestLoader().discover('tests')
    unittest.TextTestRunner(verbosity = 2).run(tests)
    if COV:
        COV.stop()
        COV.save()
        print('Coverage Summary:')
        COV.report()
        basedir = os.path.abspath(os.path.dirname(__file__))
        covdir = os.path.join(basedir, 'tmp/coverage')
        COV.html_report(directory = covdir)
        print('HTML version: file://%s/index.html' % covdir)
        COV.erase()
gpginterface.py 文件源码 项目:alicloud-duplicity 作者: aliyun 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _as_child(self, process, gnupg_commands, args):
        """Stuff run after forking in child"""
        # child
        for std in _stds:
            p = process._pipes[std]
            os.dup2(p.child, getattr(sys, "__%s__" % std).fileno())

        for k, p in process._pipes.items():
            if p.direct and k not in _stds:
                # we want the fh to stay open after execing
                fcntl.fcntl(p.child, fcntl.F_SETFD, 0)

        fd_args = []

        for k, p in process._pipes.items():
            # set command-line options for non-standard fds
            if k not in _stds:
                fd_args.extend([_fd_options[k], "%d" % p.child])

            if not p.direct:
                os.close(p.parent)

        command = [self.call] + fd_args + self.options.get_args() + gnupg_commands + args

        os.execvp(command[0], command)
run.py 文件源码 项目:Graduation_design 作者: mr-betterman 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test(coverage=False):
    """Run the unit tests."""
    if coverage and not os.environ.get('FLASK_COVERAGE'):
        import sys
        os.environ['FLASK_COVERAGE'] = '1'
        os.execvp(sys.executable, [sys.executable] + sys.argv)
    import unittest
    tests = unittest.TestLoader().discover('tests')
    unittest.TextTestRunner(verbosity=2).run(tests)
    if COV:
        COV.stop()
        COV.save()
        print('Coverage Summary:')
        COV.report()
        basedir = os.path.abspath(os.path.dirname(__file__))
        covdir = os.path.join(basedir, 'tmp/coverage')
        COV.html_report(directory=covdir)
        print('HTML version: file://%s/index.html' % covdir)
        COV.erase()
job_client.py 文件源码 项目:ava-capture 作者: electronicarts 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def loop_forever(self):
        try:
            while 1:
                try:
                    self.status_changed = False
                    self.phone_home()

                except Exception as e:
                    self.logger.error('Failed to contact server. %s' % e)

                if self.need_restart and not self.container.running_jobs and not self.container.finished_jobs:
                    print 'Restarting...'
                    # End current process and launch a new one with the same command line parameters
                    #os.execvp('python.exe', ['python.exe'] + sys.argv)
                    exit(3)

                if not self.status_changed:
                    self.cpu_percent = psutil.cpu_percent(interval=self.PHONEHOME_DELAY)

        except KeyboardInterrupt:
            pass
        finally:
            print 'Exiting...'
            self.notify_exit()


问题


面经


文章

微信
公众号

扫码关注公众号