python类CONF的实例源码

client_auth.py 文件源码 项目:meteos 作者: openstack 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def list_opts(group):
        """Generates a list of config option for a given group

        :param group: group name
        :return: list of auth default configuration
        """
        opts = copy.deepcopy(ks_loading.register_session_conf_options(
                             CONF, group))
        opts.insert(0, ks_loading.get_auth_common_conf_options()[0])

        for plugin_option in ks_loading.get_auth_plugin_conf_options(
                'password'):
            found = False
            for option in opts:
                if option.name == plugin_option.name:
                    found = True
                    break
            if not found:
                opts.append(plugin_option)
        opts.sort(key=lambda x: x.name)
        return [(group, opts)]
client_auth.py 文件源码 项目:meteos 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _load_auth_plugin(self):
        if self.admin_auth:
            return self.admin_auth
        self.auth_plugin = ks_loading.load_auth_from_conf_options(
            CONF, self.group)

        if self.url.find('v2') > -1:
            self.auth_plugin = v2.Token().load_from_options(
                **self.deprecated_opts_for_v2)
        else:
            self.auth_plugin = v3.Token().load_from_options(
                **self.opts_for_v3)

        if self.auth_plugin:
            return self.auth_plugin

        msg = _('Cannot load auth plugin for %s') % self.group
        raise self.exception_module.Unauthorized(message=msg)
odlc.py 文件源码 项目:gluon 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, backend='neutron'):

        odl_ip = CONF.shim_odl.odl_host
        odl_port = CONF.shim_odl.odl_port
        user = CONF.shim_odl.odl_user
        passwd = CONF.shim_odl.odl_passwd

        LOG.info("odl_host: %s" % odl_ip)
        LOG.info("odl_port: %s" % odl_port)
        LOG.info('odl_user: %s' % user)
        LOG.info('odl_passwd: %s' % passwd)

        if backend == 'neutron':
            self.url = ("http://%(ip)s:%(port)s/controller/nb/v2/neutron" %
                        {'ip': odl_ip,
                         'port': odl_port})
        if backend == 'restconf':
            self.url = ("http://%(ip)s:%(port)s/restconf/config" %
                        {'ip': odl_ip,
                         'port': odl_port})
        self.auth = (user, passwd)
        self.timeout = 10
transform_service.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def __init__(self):
        super(TransformService, self).__init__()

        self.coordinator = None

        self.group = CONF.service.coordinator_group

        # A unique name used for establishing election candidacy
        self.my_host_name = socket.getfqdn()

        # periodic check
        leader_check = loopingcall.FixedIntervalLoopingCall(
            self.periodic_leader_check)
        leader_check.start(interval=float(
            CONF.service.election_polling_frequency))
transform_service.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def when_i_am_elected_leader(self, event):
        """Callback when this host gets elected leader."""

        # set running state
        self.previously_running = True

        self.LOG.info("Monasca Transform service running on %s "
                      "has been elected leader" % str(self.my_host_name))

        if CONF.service.spark_python_files:
            pyfiles = (" --py-files %s"
                       % CONF.service.spark_python_files)
        else:
            pyfiles = ''

        event_logging_dest = ''
        if (CONF.service.spark_event_logging_enabled and
                CONF.service.spark_event_logging_dest):
            event_logging_dest = (
                "--conf spark.eventLog.dir="
                "file://%s" %
                CONF.service.spark_event_logging_dest)

        # Build the command to start the Spark driver
        spark_cmd = "".join((
            "export SPARK_HOME=",
            CONF.service.spark_home,
            " && ",
            "spark-submit --master ",
            CONF.service.spark_master_list,
            " --conf spark.eventLog.enabled=",
            CONF.service.spark_event_logging_enabled,
            event_logging_dest,
            " --jars " + CONF.service.spark_jars_list,
            pyfiles,
            " " + CONF.service.spark_driver))

        # Start the Spark driver
        # (specify shell=True in order to
        #  correctly handle wildcards in the spark_cmd)
        subprocess.call(spark_cmd, shell=True)
transform_service.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def main_service():
    """Method to use with Openstack service.
    """
    ConfigInitializer.basic_config()
    LogUtils.init_logger(__name__)
    launcher = os_service.ServiceLauncher(cfg.CONF)
    launcher.launch_service(Transform())
    launcher.wait()

# Used if run without Openstack service.
config_initializer.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def load_database_options():
        db_opts = [
            cfg.StrOpt('server_type'),
            cfg.StrOpt('host'),
            cfg.StrOpt('database_name'),
            cfg.StrOpt('username'),
            cfg.StrOpt('password'),
            cfg.BoolOpt('use_ssl', default=False),
            cfg.StrOpt('ca_file')
        ]
        mysql_group = cfg.OptGroup(name='database', title='database')
        cfg.CONF.register_group(mysql_group)
        cfg.CONF.register_opts(db_opts, group=mysql_group)
config_initializer.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def load_service_options():
        service_opts = [
            cfg.StrOpt('coordinator_address'),
            cfg.StrOpt('coordinator_group'),
            cfg.FloatOpt('election_polling_frequency'),
            cfg.BoolOpt('enable_debug_log_entries', default='false'),
            cfg.StrOpt('setup_file'),
            cfg.StrOpt('setup_target'),
            cfg.StrOpt('spark_driver'),
            cfg.StrOpt('service_log_path'),
            cfg.StrOpt('service_log_filename',
                       default='monasca-transform.log'),
            cfg.StrOpt('spark_event_logging_dest'),
            cfg.StrOpt('spark_event_logging_enabled'),
            cfg.StrOpt('spark_jars_list'),
            cfg.StrOpt('spark_master_list'),
            cfg.StrOpt('spark_python_files'),
            cfg.IntOpt('stream_interval'),
            cfg.StrOpt('work_dir'),
            cfg.StrOpt('spark_home'),
            cfg.BoolOpt('enable_record_store_df_cache'),
            cfg.StrOpt('record_store_df_cache_storage_level')
        ]
        service_group = cfg.OptGroup(name='service', title='service')
        cfg.CONF.register_group(service_group)
        cfg.CONF.register_opts(service_opts, group=service_group)
config_initializer.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def load_stage_processors_options():
        app_opts = [
            cfg.BoolOpt('pre_hourly_processor_enabled'),
        ]
        app_group = cfg.OptGroup(name='stage_processors',
                                 title='stage_processors')
        cfg.CONF.register_group(app_group)
        cfg.CONF.register_opts(app_opts, group=app_group)
test_db_util.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_get_java_db_connection_string_with_ssl(self):
        self.assertEqual(
            'jdbc:jdbc_driver://test_ssl_hostname/db_name?user=test_user'
            '&password=pwd&useSSL=True&requireSSL=True',
            DbUtil.get_java_db_connection_string(cfg.CONF))
service.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def run(self):
        # NOTE(dulek): We might do a *lot* of pyroute2 operations, let's
        #              make the pyroute2 timeout configurable to make sure
        #              kernel will have chance to catch up.
        transactional.SYNC_TIMEOUT = CONF.cni_daemon.pyroute2_timeout

        # Run HTTP server
        self.server.run()
main.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run():
    # REVISIT(ivc): current CNI implementation provided by this package is
    # experimental and its primary purpose is to enable development of other
    # components (e.g. functional tests, service/LBaaSv2 support)
    cni_conf = utils.CNIConfig(jsonutils.load(sys.stdin))
    args = ['--config-file', cni_conf.kuryr_conf]

    try:
        if cni_conf.debug:
            args.append('-d')
    except AttributeError:
        pass
    config.init(args)
    config.setup_logging()

    # Initialize o.vo registry.
    k_objects.register_locally_defined_vifs()
    os_vif.initialize()

    if CONF.cni_daemon.daemon_enabled:
        runner = cni_api.CNIDaemonizedRunner()
    else:
        runner = cni_api.CNIStandaloneRunner(K8sCNIPlugin())
    LOG.info("Using '%s' ", runner.__class__.__name__)

    def _timeout(signum, frame):
        runner._write_dict(sys.stdout, {
            'msg': 'timeout',
            'code': k_const.CNI_TIMEOUT_CODE,
        })
        LOG.debug('timed out')
        sys.exit(1)

    signal.signal(signal.SIGALRM, _timeout)
    signal.alarm(_CNI_TIMEOUT)
    status = runner.run(os.environ, cni_conf, sys.stdout)
    LOG.debug("Exiting with status %s", status)
    if status:
        sys.exit(status)
test_api.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def setUp(self):
        super(TestCNIDaemonizedRunner, self).setUp()
        self.runner = api.CNIDaemonizedRunner()
        self.port = int(CONF.cni_daemon.bind_address.split(':')[1])
default_subnet.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_subnets(self, service, project_id):
        subnet_id = config.CONF.neutron_defaults.service_subnet

        if not subnet_id:
            # NOTE(ivc): this option is only required for
            # DefaultServiceSubnetDriver and its subclasses, but it may be
            # optional for other drivers (e.g. when each namespace has own
            # subnet)
            raise cfg.RequiredOptError('service_subnet',
                                       cfg.OptGroup('neutron_defaults'))

        return {subnet_id: _get_subnet(subnet_id)}
config.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def init(args, **kwargs):
    version_k8s = version.version_info.version_string()
    CONF(args=args, project='kuryr-k8s', version=version_k8s, **kwargs)
config.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def setup_logging():

    logging.setup(CONF, 'kuryr-kubernetes')
    logging.set_defaults(default_log_levels=logging.get_default_log_levels())
    version_k8s = version.version_info.version_string()
    LOG.info("Logging enabled!")
    LOG.info("%(prog)s version %(version)s",
             {'prog': sys.argv[0], 'version': version_k8s})
power.py 文件源码 项目:ironic-staging-drivers 作者: openstack 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _switch(driver_info, enabled):
    conn = _get_connection(driver_info)
    relay_id = driver_info['relay_id']

    def _wait_for_switch(mutable):
        if mutable['retries'] > CONF.iboot.max_retry:
            LOG.warning(
                'Reached maximum number of attempts (%(attempts)d) to set '
                'power state for node %(node)s to "%(op)s"',
                {'attempts': mutable['retries'], 'node': driver_info['uuid'],
                 'op': states.POWER_ON if enabled else states.POWER_OFF})
            raise loopingcall.LoopingCallDone()

        try:
            mutable['retries'] += 1
            mutable['response'] = conn.switch(relay_id, enabled)
            if mutable['response']:
                raise loopingcall.LoopingCallDone()
        except (TypeError, IndexError):
            LOG.warning("Cannot call set power state for node '%(node)s' "
                        "at relay '%(relay)s'. iBoot switch() failed.",
                        {'node': driver_info['uuid'], 'relay': relay_id})

    mutable = {'response': False, 'retries': 0}
    timer = loopingcall.FixedIntervalLoopingCall(_wait_for_switch,
                                                 mutable)
    timer.start(interval=CONF.iboot.retry_interval).wait()
    return mutable['response']


问题


面经


文章

微信
公众号

扫码关注公众号