python类execvp()的实例源码

TestCmd.py 文件源码 项目:gyp 作者: electron 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, cmd, bufsize=-1):
                    p2cread, p2cwrite = os.pipe()
                    c2pread, c2pwrite = os.pipe()
                    self.pid = os.fork()
                    if self.pid == 0:
                        # Child
                        os.dup2(p2cread, 0)
                        os.dup2(c2pwrite, 1)
                        os.dup2(c2pwrite, 2)
                        for i in range(3, popen2.MAXFD):
                            try:
                                os.close(i)
                            except: pass
                        try:
                            os.execvp(cmd[0], cmd)
                        finally:
                            os._exit(1)
                        # Shouldn't come here, I guess
                        os._exit(1)
                    os.close(p2cread)
                    self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
                    os.close(c2pwrite)
                    self.fromchild = os.fdopen(c2pread, 'r', bufsize)
                    popen2._active.append(self)
manage.py 文件源码 项目:flask-blog 作者: zhuwei05 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test(coverage=False):
    """Run the unit tests."""
    if coverage and not os.environ.get('FLASK_COVERAGE'):
        import sys
        os.environ['FLASK_COVERAGE'] = '1'
        os.execvp(sys.executable, [sys.executable] + sys.argv)
    import unittest
    tests = unittest.TestLoader().discover('tests')
    unittest.TextTestRunner(verbosity=2).run(tests)
    if COV:
        COV.stop()
        COV.save()
        print('Coverage Summary:')
        COV.report()
        basedir = os.path.abspath(os.path.dirname(__file__))
        covdir = os.path.join(basedir, 'tmp/coverage')
        COV.html_report(directory=covdir)
        print('HTML version: file://%s/index.html' % covdir)
        COV.erase()

# source code profile
manage.py 文件源码 项目:blog 作者: hukaixuan 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test(coverage=False):
    """Run the unit tests"""
    if coverage and not os.environ.get('FLASK_COVERAGE'):
        import sys
        os.environ['FLASK_COVERAGE'] = '1'
        os.execvp(sys.executable, [sys.executable] + sys.argv)
    import unittest
    tests = unittest.TestLoader().discover('tests')
    unittest.TextTestRunner(verbosity=2).run(tests)
    if COV:
        COV.stop()
        COV.save()
        print('Coverage Summary:')
        COV.report()
        basedir = os.path.abspath(os.path.dirname(__file__))
        covdir = os.path.join(basedir, 'tmp/coverage')
        COV.html_report(directory=covdir)
        print('HTML version: file://%s/index.html' % covdir)
        COV.erase()
manage.py 文件源码 项目:rest_api 作者: opentargets 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test(coverage=False):
    """Run the unit tests."""
    if coverage and not os.environ.get('FLASK_COVERAGE'):
        os.environ['FLASK_COVERAGE'] = '1'
        os.execvp(sys.executable, [sys.executable] + sys.argv)
    import unittest
    tests = unittest.TestLoader().discover('tests')
    unittest.TextTestRunner(verbosity=2).run(tests)
    if COV:
        COV.stop()
        COV.save()
        print('Coverage Summary:')
        COV.report()
        basedir = os.path.abspath(os.path.dirname(__file__))
        covdir = os.path.join(basedir, 'tmp/coverage')
        COV.html_report(directory=covdir)
        print('HTML version: file://%s/index.html' % covdir)
        COV.erase()
manage.py 文件源码 项目:livetv_mining 作者: taogeT 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test(coverage=False):
    """Run the unit tests."""
    if coverage and not os.environ.get('FLASK_COVERAGE'):
        os.environ['FLASK_COVERAGE'] = '1'
        os.execvp(sys.executable, [sys.executable] + sys.argv)
    testresult = TextTestRunner(verbosity=2).run(TestLoader().discover('tests'))
    if cov:
        cov.stop()
        cov.save()
        print('Coverage Summary:')
        cov.report()
        covdir = app.config.get('COVERAGE_DIRECTORY', '')
        if covdir:
            covdir = os.path.join(covdir, datetime.utcnow().strftime('%Y-%m-%d-%H-%M-%S'))
            if not os.path.exists(covdir):
                os.makedirs(covdir)
            cov.html_report(directory=covdir)
            print('Coverage HTML version: file://{}/index.html'.format(covdir))
        cov.erase()
    if len(testresult.failures) + len(testresult.errors) > 0:
        sys.exit(1)
zephyr_mirror_backend.py 文件源码 项目:python-zulip-api 作者: zulip 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def maybe_restart_mirroring_script():
    # type: () -> None
    if os.stat(os.path.join(options.stamp_path, "stamps", "restart_stamp")).st_mtime > start_time or \
            ((options.user == "tabbott" or options.user == "tabbott/extra") and
             os.stat(os.path.join(options.stamp_path, "stamps", "tabbott_stamp")).st_mtime > start_time):
        logger.warning("")
        logger.warning("zephyr mirroring script has been updated; restarting...")
        maybe_kill_child()
        try:
            zephyr._z.cancelSubs()
        except IOError:
            # We don't care whether we failed to cancel subs properly, but we should log it
            logger.exception("")
        while True:
            try:
                os.execvp(os.path.abspath(__file__), sys.argv)
            except Exception:
                logger.exception("Error restarting mirroring script; trying again... Traceback:")
                time.sleep(1)
popen2.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _run_child(self, cmd):
        if isinstance(cmd, basestring):
            cmd = ['/bin/sh', '-c', cmd]
        os.closerange(3, MAXFD)
        try:
            os.execvp(cmd[0], cmd)
        finally:
            os._exit(1)
external_search_command.py 文件源码 项目:mongodb-monitoring 作者: jruaux 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _execute(path, argv, environ):
            if environ is None:
                os.execvp(path, argv)
            else:
                os.execvpe(path, argv, environ)
            return

    # endregion
external_search_command.py 文件源码 项目:Splunk_CBER_App 作者: MHaggis 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _execute(path, argv, environ):
            if environ is None:
                os.execvp(path, argv)
            else:
                os.execvpe(path, argv, environ)
            return

    # endregion
popen2.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def _run_child(self, cmd):
        if isinstance(cmd, basestring):
            cmd = ['/bin/sh', '-c', cmd]
        os.closerange(3, MAXFD)
        try:
            os.execvp(cmd[0], cmd)
        finally:
            os._exit(1)
manage.py 文件源码 项目:meteos 作者: openstack 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def main():
    """Parse options and call the appropriate class/method."""
    CONF.register_cli_opt(category_opt)
    script_name = sys.argv[0]
    if len(sys.argv) < 2:
        print(_("\nOpenStack meteos version: %(version)s\n") %
              {'version': version.version_string()})
        print(script_name + " category action [<args>]")
        print(_("Available categories:"))
        for category in CATEGORIES:
            print("\t%s" % category)
        sys.exit(2)

    try:
        log.register_options(CONF)
        CONF(sys.argv[1:], project='meteos',
             version=version.version_string())
        log.setup(CONF, "meteos")
    except cfg.ConfigFilesNotFoundError:
        cfgfile = CONF.config_file[-1] if CONF.config_file else None
        if cfgfile and not os.access(cfgfile, os.R_OK):
            st = os.stat(cfgfile)
            print(_("Could not read %s. Re-running with sudo") % cfgfile)
            try:
                os.execvp('sudo', ['sudo', '-u', '#%s' % st.st_uid] + sys.argv)
            except Exception:
                print(_('sudo failed, continuing as if nothing happened'))

        print(_('Please re-run meteos-manage as root.'))
        sys.exit(2)

    fn = CONF.category.action_fn

    fn_args = fetch_func_args(fn)
    fn(*fn_args)
rd.py 文件源码 项目:rubber-docker 作者: Fewbytes 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def contain(command, image_name, image_dir, container_id, container_dir):
    # TODO: would you like to do something before chrooting?
    # print('Created a new root fs for our container: {}'.format(new_root))

    # TODO: chroot into new_root
    # TODO: something after chrooting?

    os.execvp(command[0], command)
rd.py 文件源码 项目:rubber-docker 作者: Fewbytes 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def contain(command, image_name, image_dir, container_id, container_dir):
    linux.unshare(linux.CLONE_NEWNS)  # create a new mount namespace

    # TODO: we added MS_REC here. wanna guess why?
    linux.mount(None, '/', None, linux.MS_PRIVATE | linux.MS_REC, None)

    new_root = create_container_root(
        image_name, image_dir, container_id, container_dir)
    print('Created a new root fs for our container: {}'.format(new_root))

    # Create mounts (/proc, /sys, /dev) under new_root
    linux.mount('proc', os.path.join(new_root, 'proc'), 'proc', 0, '')
    linux.mount('sysfs', os.path.join(new_root, 'sys'), 'sysfs', 0, '')
    linux.mount('tmpfs', os.path.join(new_root, 'dev'), 'tmpfs',
                linux.MS_NOSUID | linux.MS_STRICTATIME, 'mode=755')

    # Add some basic devices
    devpts_path = os.path.join(new_root, 'dev', 'pts')
    if not os.path.exists(devpts_path):
        os.makedirs(devpts_path)
        linux.mount('devpts', devpts_path, 'devpts', 0, '')

    makedev(os.path.join(new_root, 'dev'))

    os.chroot(new_root)  # TODO: replace with pivot_root

    os.chdir('/')

    # TODO: umount2 old root (HINT: see MNT_DETACH in man 2 umount)

    os.execvp(command[0], command)
rd.py 文件源码 项目:rubber-docker 作者: Fewbytes 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def contain(command, image_name, image_dir, container_id, container_dir):
    new_root = create_container_root(
        image_name, image_dir, container_id, container_dir)
    print('Created a new root fs for our container: {}'.format(new_root))

    # TODO: time to say goodbye to the old mount namespace,
    #       see "man 2 unshare" to get some help
    #   HINT 1: there is no os.unshare(), time to use the linux module we made
    #           just for you!
    #   HINT 2: the linux module includes both functions and constants!
    #           e.g. linux.CLONE_NEWNS

    # TODO: remember shared subtrees?
    # (https://www.kernel.org/doc/Documentation/filesystems/sharedsubtree.txt)
    # Make / a private mount to avoid littering our host mount table.

    # Create mounts (/proc, /sys, /dev) under new_root
    linux.mount('proc', os.path.join(new_root, 'proc'), 'proc', 0, '')
    linux.mount('sysfs', os.path.join(new_root, 'sys'), 'sysfs', 0, '')
    linux.mount('tmpfs', os.path.join(new_root, 'dev'), 'tmpfs',
                linux.MS_NOSUID | linux.MS_STRICTATIME, 'mode=755')
    # Add some basic devices
    devpts_path = os.path.join(new_root, 'dev', 'pts')
    if not os.path.exists(devpts_path):
        os.makedirs(devpts_path)
        linux.mount('devpts', devpts_path, 'devpts', 0, '')
    for i, dev in enumerate(['stdin', 'stdout', 'stderr']):
        os.symlink('/proc/self/fd/%d' % i, os.path.join(new_root, 'dev', dev))

    # TODO: add more devices (e.g. null, zero, random, urandom) using os.mknod.

    os.chroot(new_root)

    os.chdir('/')

    os.execvp(command[0], command)
rd.py 文件源码 项目:rubber-docker 作者: Fewbytes 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def contain(command, image_name, image_dir, container_id, container_dir,
            cpu_shares, memory, memory_swap):
    _setup_cpu_cgroup(container_id, cpu_shares)

    # TODO: similarly to the CPU cgorup, add Memory cgroup support here
    #       setup memory -> memory.limit_in_bytes,
    #       memory_swap -> memory.memsw.limit_in_bytes if they are not None

    linux.sethostname(container_id)  # Change hostname to container_id

    linux.mount(None, '/', None, linux.MS_PRIVATE | linux.MS_REC, None)

    new_root = create_container_root(
        image_name, image_dir, container_id, container_dir)
    print('Created a new root fs for our container: {}'.format(new_root))

    _create_mounts(new_root)

    old_root = os.path.join(new_root, 'old_root')
    os.makedirs(old_root)
    linux.pivot_root(new_root, old_root)

    os.chdir('/')

    linux.umount2('/old_root', linux.MNT_DETACH)  # umount old root
    os.rmdir('/old_root') # rmdir the old_root dir

    os.execvp(command[0], command)
rd.py 文件源码 项目:rubber-docker 作者: Fewbytes 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def contain(command, image_name, image_dir, container_id, container_dir,
            cpu_shares):
    # TODO: insert the container to a new cpu cgroup named:
    #       'rubber_docker/container_id'

    # TODO: if (cpu_shares != 0)  => set the 'cpu.shares' in our cpu cgroup

    linux.sethostname(container_id)  # change hostname to container_id

    linux.mount(None, '/', None, linux.MS_PRIVATE | linux.MS_REC, None)

    new_root = create_container_root(
        image_name, image_dir, container_id, container_dir)
    print('Created a new root fs for our container: {}'.format(new_root))

    _create_mounts(new_root)

    old_root = os.path.join(new_root, 'old_root')
    os.makedirs(old_root)
    linux.pivot_root(new_root, old_root)

    os.chdir('/')

    linux.umount2('/old_root', linux.MNT_DETACH)  # umount old root
    os.rmdir('/old_root') # rmdir the old_root dir

    os.execvp(command[0], command)
docker.py 文件源码 项目:dockwrkr 作者: turbulent 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def run(container, containerArgs, config, basePath=None, networks=None):
    params = readCreateParameters(
        container, config, basePath=basePath, networks=networks, asList=True)
    if params.isFail():
        return params
    try:
        cmd = [DOCKER_CLIENT, "run", "--rm", "--interactive", "--tty"] + params.getOK() + containerArgs
        logger.debug("EXECVP - %s" % subprocess.list2cmdline(cmd))
        os.execvp(DOCKER_CLIENT, cmd)
    except Exception as ex:
        return Fail(ex)
run_raiden.py 文件源码 项目:raiden 作者: raiden-network 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def main(eth_nodes, seed, raiden_executable, raiden_args):
    if ETH_RPC_ENDPOINT_ARG in raiden_args:
        raise RuntimeError("Argument conflict: {}".format(ETH_RPC_ENDPOINT_ARG))
    eth_nodes = eth_nodes.split(",")
    offset = sum(ord(c) for c in seed) % len(eth_nodes)
    target_eth_node = eth_nodes[offset]
    raiden_args = [raiden_executable] + list(raiden_args) + [ETH_RPC_ENDPOINT_ARG, target_eth_node]
    print(" ".join(raiden_args))
    # Ensure print is flushed - exec could swallow it
    sys.stdout.flush()
    os.execvp(raiden_args[0], raiden_args)
tester.py 文件源码 项目:runtests 作者: bccp 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _launch_mpisub(self, args, site_dir):

        # extract the mpirun run argument
        parser = ArgumentParser(add_help=False)
        # these values are ignored. This is a hack to filter out unused argv.
        parser.add_argument("--single", default=False, action='store_true')
        parser.add_argument("--mpirun", default=None)
        _args, additional = parser.parse_known_args()

        # now call with mpirun
        mpirun = args.mpirun.split()
        cmdargs = [sys.executable, sys.argv[0], '--mpisub']

        if site_dir is not None:
            # mpi subs will use system version of package
            cmdargs.extend(['--mpisub-site-dir=' + site_dir])

        # workaround the strict openmpi oversubscribe policy
        # the parameter is found from
        # https://github.com/open-mpi/ompi/blob/ba47f738871ff06b8e8f34b8e18282b9fe479586/orte/mca/rmaps/base/rmaps_base_frame.c#L169
        # see the faq:
        #   https://www.open-mpi.org/faq/?category=running#oversubscribing
        os.environ['OMPI_MCA_rmaps_base_oversubscribe'] = '1'

        os.execvp(mpirun[0], mpirun + cmdargs + additional)

        # if we are here os.execvp has failed; bail
        sys.exit(1)
client.py 文件源码 项目:python-ibmdb-django 作者: ibmdb 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def runshell( self ):
        if ( djangoVersion[0:2] <= ( 1, 0 ) ):
            from django.conf import settings
            database_name = settings.DATABASE_NAME
            database_user = settings.DATABASE_USER
            database_password = settings.DATABASE_PASSWORD
        elif ( djangoVersion[0:2] <= ( 1, 1 ) ):
            settings_dict = self.connection.settings_dict
            database_name = settings_dict['DATABASE_NAME']
            database_user = settings_dict['DATABASE_USER']
            database_password = settings_dict['DATABASE_PASSWORD']
        else:
            settings_dict = self.connection.settings_dict
            database_name = settings_dict['NAME']
            database_user = settings_dict['USER']
            database_password = settings_dict['PASSWORD']

        cmdArgs = ["db2"]

        if ( os.name == 'nt' ):
            cmdArgs += ["db2 connect to %s" % database_name]
        else:
            cmdArgs += ["connect to %s" % database_name]
        if sys.version_info.major >= 3:
            basestring = str
        else:
            basestring = basestring

        if ( isinstance( database_user, basestring ) and 
            ( database_user != '' ) ):
            cmdArgs += ["user %s" % database_user]

            if ( isinstance( database_password, basestring ) and 
                ( database_password != '' ) ):
                cmdArgs += ["using %s" % database_password]

        # db2cmd is the shell which is required to run db2 commands on windows.
        if ( os.name == 'nt' ):
            os.execvp( 'db2cmd', cmdArgs )
        else:
            os.execvp( 'db2', cmdArgs )


问题


面经


文章

微信
公众号

扫码关注公众号