python类environ()的实例源码

optparse.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self,
                 indent_increment,
                 max_help_position,
                 width,
                 short_first):
        self.parser = None
        self.indent_increment = indent_increment
        self.help_position = self.max_help_position = max_help_position
        if width is None:
            try:
                width = int(os.environ['COLUMNS'])
            except (KeyError, ValueError):
                width = 80
            width -= 2
        self.width = width
        self.current_indent = 0
        self.level = 0
        self.help_width = None          # computed later
        self.short_first = short_first
        self.default_tag = "%default"
        self.option_strings = {}
        self._short_opt_fmt = "%s %s"
        self._long_opt_fmt = "%s=%s"
sysconfig.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _getuserbase():
    env_base = os.environ.get("IRONPYTHONUSERBASE", None)
    def joinuser(*args):
        return os.path.expanduser(os.path.join(*args))

    # what about 'os2emx', 'riscos' ?
    if os.name == "nt":
        base = os.environ.get("APPDATA") or "~"
        return env_base if env_base else joinuser(base, "Python")

    if sys.platform == "darwin":
        framework = get_config_var("PYTHONFRAMEWORK")
        if framework:
            return joinuser("~", "Library", framework, "%d.%d"%(
                sys.version_info[:2]))

    return env_base if env_base else joinuser("~", ".local")
platform.py 文件源码 项目:pygrunt 作者: elementbound 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def path(klass):
        return os.environ['PATH'].split(os.pathsep)
db.py 文件源码 项目:ISB-CGC-pipelines 作者: isb-cgc 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, config):
        if config.db == "mysql":
            pass  # TODO: determine best production grade relational database to use

        elif config.db == "sqlite":
            self._dbConn = sqlite3.connect(os.environ["PIPELINES_DB"])

        self._pipelinesDb = self._dbConn.cursor()
utils.py 文件源码 项目:ISB-CGC-pipelines 作者: isb-cgc 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def editPipeline(args, config):
        pipelineDbUtils = PipelineDbUtils(config)

        request = json.loads(pipelineDbUtils.getJobInfo(select=["request"], where={"job_id": args.jobId})[0].request)

        _, tmp = mkstemp()
        with open(tmp, 'w') as f:
            f.write("{data}".format(data=json.dumps(request, indent=4)))

        if "EDITOR" in os.environ.keys():
            editor = os.environ["EDITOR"]
        else:
            editor = "/usr/bin/nano"

        if subprocess.call([editor, tmp]) == 0:
            with open(tmp, 'r') as f:
                request = json.load(f)

            pipelineDbUtils.updateJob(args.jobId, keyName="job_id", setValues={"request": json.dumps(request)})
        else:
            print "ERROR: there was a problem editing the request"
            exit(-1)
unitdata.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, path=None):
        self.db_path = path
        if path is None:
            if 'UNIT_STATE_DB' in os.environ:
                self.db_path = os.environ['UNIT_STATE_DB']
            else:
                self.db_path = os.path.join(
                    os.environ.get('CHARM_DIR', ''), '.unit-state.db')
        self.conn = sqlite3.connect('%s' % self.db_path)
        self.cursor = self.conn.cursor()
        self.revision = None
        self._closed = False
        self._init()
hookenv.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def execution_environment():
    """A convenient bundling of the current execution context"""
    context = {}
    context['conf'] = config()
    if relation_id():
        context['reltype'] = relation_type()
        context['relid'] = relation_id()
        context['rel'] = relation_get()
    context['unit'] = local_unit()
    context['rels'] = relations()
    context['env'] = os.environ
    return context
hookenv.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def relation_type():
    """The scope for the current relation hook"""
    return os.environ.get('JUJU_RELATION', None)
hookenv.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 66 收藏 0 点赞 0 评论 0
def relation_id(relation_name=None, service_or_unit=None):
    """The relation ID for the current or a specified relation"""
    if not relation_name and not service_or_unit:
        return os.environ.get('JUJU_RELATION_ID', None)
    elif relation_name and service_or_unit:
        service_name = service_or_unit.split('/')[0]
        for relid in relation_ids(relation_name):
            remote_service = remote_service_name(relid)
            if remote_service == service_name:
                return relid
    else:
        raise ValueError('Must specify neither or both of relation_name and service_or_unit')
hookenv.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def local_unit():
    """Local unit ID"""
    return os.environ['JUJU_UNIT_NAME']
hookenv.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def hook_name():
    """The name of the currently executing hook"""
    return os.environ.get('JUJU_HOOK_NAME', os.path.basename(sys.argv[0]))
hookenv.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def charm_dir():
    """Return the root directory of the current charm"""
    return os.environ.get('CHARM_DIR')
hookenv.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 51 收藏 0 点赞 0 评论 0
def action_name():
    """Get the name of the currently executing action."""
    return os.environ.get('JUJU_ACTION_NAME')
hookenv.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def action_uuid():
    """Get the UUID of the currently executing action."""
    return os.environ.get('JUJU_ACTION_UUID')
hookenv.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def action_tag():
    """Get the tag for the currently executing action."""
    return os.environ.get('JUJU_ACTION_TAG')
ufw.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def enable(soft_fail=False):
    """
    Enable ufw

    :param soft_fail: If set to True silently disables IPv6 support in ufw,
                      otherwise a UFWIPv6Error exception is raised when IP6
                      support is broken.
    :returns: True if ufw is successfully enabled
    """
    if is_enabled():
        return True

    if not is_ipv6_ok(soft_fail):
        disable_ipv6()

    output = subprocess.check_output(['ufw', 'enable'],
                                     universal_newlines=True,
                                     env={'LANG': 'en_US',
                                          'PATH': os.environ['PATH']})

    m = re.findall('^Firewall is active and enabled on system startup\n',
                   output, re.M)
    hookenv.log(output, level='DEBUG')

    if len(m) == 0:
        hookenv.log("ufw couldn't be enabled", level='WARN')
        return False
    else:
        hookenv.log("ufw enabled", level='INFO')
        return True
ufw.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def disable():
    """
    Disable ufw

    :returns: True if ufw is successfully disabled
    """
    if not is_enabled():
        return True

    output = subprocess.check_output(['ufw', 'disable'],
                                     universal_newlines=True,
                                     env={'LANG': 'en_US',
                                          'PATH': os.environ['PATH']})

    m = re.findall(r'^Firewall stopped and disabled on system startup\n',
                   output, re.M)
    hookenv.log(output, level='DEBUG')

    if len(m) == 0:
        hookenv.log("ufw couldn't be disabled", level='WARN')
        return False
    else:
        hookenv.log("ufw disabled", level='INFO')
        return True
ufw.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def default_policy(policy='deny', direction='incoming'):
    """
    Changes the default policy for traffic `direction`

    :param policy: allow, deny or reject
    :param direction: traffic direction, possible values: incoming, outgoing,
                      routed
    """
    if policy not in ['allow', 'deny', 'reject']:
        raise UFWError(('Unknown policy %s, valid values: '
                        'allow, deny, reject') % policy)

    if direction not in ['incoming', 'outgoing', 'routed']:
        raise UFWError(('Unknown direction %s, valid values: '
                        'incoming, outgoing, routed') % direction)

    output = subprocess.check_output(['ufw', 'default', policy, direction],
                                     universal_newlines=True,
                                     env={'LANG': 'en_US',
                                          'PATH': os.environ['PATH']})
    hookenv.log(output, level='DEBUG')

    m = re.findall("^Default %s policy changed to '%s'\n" % (direction,
                                                             policy),
                   output, re.M)
    if len(m) == 0:
        hookenv.log("ufw couldn't change the default policy to %s for %s"
                    % (policy, direction), level='WARN')
        return False
    else:
        hookenv.log("ufw default policy for %s changed to %s"
                    % (direction, policy), level='INFO')
        return True
execd.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def default_execd_dir():
    return os.path.join(os.environ['CHARM_DIR'], 'exec.d')
main.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def run():
    # REVISIT(ivc): current CNI implementation provided by this package is
    # experimental and its primary purpose is to enable development of other
    # components (e.g. functional tests, service/LBaaSv2 support)
    cni_conf = utils.CNIConfig(jsonutils.load(sys.stdin))
    args = ['--config-file', cni_conf.kuryr_conf]

    try:
        if cni_conf.debug:
            args.append('-d')
    except AttributeError:
        pass
    config.init(args)
    config.setup_logging()

    # Initialize o.vo registry.
    k_objects.register_locally_defined_vifs()
    os_vif.initialize()

    if CONF.cni_daemon.daemon_enabled:
        runner = cni_api.CNIDaemonizedRunner()
    else:
        runner = cni_api.CNIStandaloneRunner(K8sCNIPlugin())
    LOG.info("Using '%s' ", runner.__class__.__name__)

    def _timeout(signum, frame):
        runner._write_dict(sys.stdout, {
            'msg': 'timeout',
            'code': k_const.CNI_TIMEOUT_CODE,
        })
        LOG.debug('timed out')
        sys.exit(1)

    signal.signal(signal.SIGALRM, _timeout)
    signal.alarm(_CNI_TIMEOUT)
    status = runner.run(os.environ, cni_conf, sys.stdout)
    LOG.debug("Exiting with status %s", status)
    if status:
        sys.exit(status)


问题


面经


文章

微信
公众号

扫码关注公众号