python类OptGroup()的实例源码

config_initializer.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def load_repositories_options():
        repo_opts = [
            cfg.StrOpt(
                'offsets',
                default='monasca_transform.offset_specs:JSONOffsetSpecs',
                help='Repository for offset persistence'
            ),
            cfg.StrOpt(
                'data_driven_specs',
                default='monasca_transform.data_driven_specs.'
                        'json_data_driven_specs_repo:JSONDataDrivenSpecsRepo',
                help='Repository for metric and event data_driven_specs'
            ),
            cfg.IntOpt('offsets_max_revisions', default=10,
                       help="Max revisions of offsets for each application")
        ]
        repo_group = cfg.OptGroup(name='repositories', title='repositories')
        cfg.CONF.register_group(repo_group)
        cfg.CONF.register_opts(repo_opts, group=repo_group)
config_initializer.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def load_messaging_options():
        messaging_options = [
            cfg.StrOpt('adapter',
                       default='monasca_transform.messaging.adapter:'
                       'KafkaMessageAdapter',
                       help='Message adapter implementation'),
            cfg.StrOpt('topic', default='metrics',
                       help='Messaging topic'),
            cfg.StrOpt('brokers',
                       default='192.168.10.4:9092',
                       help='Messaging brokers'),
            cfg.StrOpt('publish_kafka_project_id',
                       default='111111',
                       help='publish aggregated metrics tenant'),
            cfg.StrOpt('adapter_pre_hourly',
                       default='monasca_transform.messaging.adapter:'
                       'KafkaMessageAdapterPreHourly',
                       help='Message adapter implementation'),
            cfg.StrOpt('topic_pre_hourly', default='metrics_pre_hourly',
                       help='Messaging topic pre hourly')
        ]
        messaging_group = cfg.OptGroup(name='messaging', title='messaging')
        cfg.CONF.register_group(messaging_group)
        cfg.CONF.register_opts(messaging_options, group=messaging_group)
utils.py 文件源码 项目:CAL 作者: HPCC-Cloud-Computing 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_list_providers():
    # ensure all driver groups have been registered
    sections = CONF.list_all_sections()
    for section in sections:
        CONF.register_group(cfg.OptGroup(section))

    # ensure all of enable drivers configured exact opts
    enable_drivers = CONF.providers.enable_drivers
    list_providers = []
    for driver in enable_drivers.keys():
        type_driver = enable_drivers.get(driver)
        if type_driver == 'openstack':
            CONF.register_opts(
                calplus.conf.providers.openstack_opts, driver)
        elif type_driver == 'amazon':
            CONF.register_opts(
                calplus.conf.providers.amazon_opts, driver)
        else:
            continue
        list_providers.append(
            Provider(type_driver, CONF.get(driver))
        )

    return list_providers
join.py 文件源码 项目:novajoin 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _get_allowed_hostclass(self, project_name):
        """Get the allowed list of hostclass from configuration."""
        try:
            group = CONF[project_name]
        except cfg.NoSuchOptError:
            # dynamically add the group into the configuration
            group = cfg.OptGroup(project_name, 'project options')
            CONF.register_group(group)
            CONF.register_opt(cfg.ListOpt('allowed_classes'),
                              group=project_name)
        try:
            allowed_classes = CONF[project_name].allowed_classes
        except cfg.NoSuchOptError:
            LOG.error('No allowed_classes config option in [%s]', project_name)
            return []
        else:
            if allowed_classes:
                return allowed_classes
            else:
                return []
valet_api.py 文件源码 项目:valet 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _register_opts(self):
        """Register oslo.config options"""
        opts = []
        option = cfg.StrOpt('url', default=None,
                            help=_('API endpoint url'))
        opts.append(option)
        option = cfg.IntOpt('read_timeout', default=5,
                            help=_('API read timeout in seconds'))
        opts.append(option)
        option = cfg.IntOpt('retries', default=3,
                            help=_('API request retries'))
        opts.append(option)

        opt_group = cfg.OptGroup('valet')
        CONF.register_group(opt_group)
        CONF.register_opts(opts, group=opt_group)
client.py 文件源码 项目:Assistant-for-Software-Defined-Infrastructure 作者: shank7485 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, conf_path):
        self.conf_path = conf_path

        self.opt_group = cfg.OptGroup(name='endpoint',
                                 title='Get the endpoints for keystone')

        self.endpoint_opts = [cfg.StrOpt('endpoint', default='None',
            help=('URL or IP address where OpenStack keystone runs.'))
        ]

        CONF = cfg.CONF
        CONF.register_group(self.opt_group)
        CONF.register_opts(self.endpoint_opts, self.opt_group)

        CONF(default_config_files=[self.conf_path])
        self.AUTH_ENDPOINT = CONF.endpoint.endpoint
test_auth.py 文件源码 项目:mogan 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def setUp(self):
        super(AuthConfTestCase, self).setUp()
        self.config(region_name='fake_region',
                    group='keystone')
        self.test_group = 'test_group'
        self.cfg_fixture.conf.register_group(cfg.OptGroup(self.test_group))
        mogan_auth.register_auth_opts(self.cfg_fixture.conf, self.test_group)
        self.config(auth_type='password',
                    group=self.test_group)
        # NOTE(pas-ha) this is due to auth_plugin options
        # being dynamically registered on first load,
        # but we need to set the config before
        plugin = kaloading.get_plugin_loader('password')
        opts = kaloading.get_auth_plugin_conf_options(plugin)
        self.cfg_fixture.register_opts(opts, group=self.test_group)
        self.config(auth_url='http://127.0.0.1:9898',
                    username='fake_user',
                    password='fake_pass',
                    project_name='fake_tenant',
                    group=self.test_group)
valet_filter.py 文件源码 项目:valet 作者: att-comdev 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _register_opts(self):
        '''Register Options'''
        opts = []
        option = cfg.StrOpt(self.opt_failure_mode_str, choices=['reject', 'yield'], default='reject',
                            help=_('Mode to operate in if Valet planning fails for any reason.'))
        opts.append(option)
        option = cfg.StrOpt(self.opt_project_name_str, default=None, help=_('Valet Project Name'))
        opts.append(option)
        option = cfg.StrOpt(self.opt_username_str, default=None, help=_('Valet Username'))
        opts.append(option)
        option = cfg.StrOpt(self.opt_password_str, default=None, help=_('Valet Password'))
        opts.append(option)
        option = cfg.StrOpt(self.opt_auth_uri_str, default=None, help=_('Keystone Authorization API Endpoint'))
        opts.append(option)

        opt_group = cfg.OptGroup(self.opt_group_str)
        cfg.CONF.register_group(opt_group)
        cfg.CONF.register_opts(opts, group=opt_group)

    # TODO(JD): Factor out common code between this and the cinder filter
config_initializer.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def load_database_options():
        db_opts = [
            cfg.StrOpt('server_type'),
            cfg.StrOpt('host'),
            cfg.StrOpt('database_name'),
            cfg.StrOpt('username'),
            cfg.StrOpt('password'),
            cfg.BoolOpt('use_ssl', default=False),
            cfg.StrOpt('ca_file')
        ]
        mysql_group = cfg.OptGroup(name='database', title='database')
        cfg.CONF.register_group(mysql_group)
        cfg.CONF.register_opts(db_opts, group=mysql_group)
config_initializer.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def load_service_options():
        service_opts = [
            cfg.StrOpt('coordinator_address'),
            cfg.StrOpt('coordinator_group'),
            cfg.FloatOpt('election_polling_frequency'),
            cfg.BoolOpt('enable_debug_log_entries', default='false'),
            cfg.StrOpt('setup_file'),
            cfg.StrOpt('setup_target'),
            cfg.StrOpt('spark_driver'),
            cfg.StrOpt('service_log_path'),
            cfg.StrOpt('service_log_filename',
                       default='monasca-transform.log'),
            cfg.StrOpt('spark_event_logging_dest'),
            cfg.StrOpt('spark_event_logging_enabled'),
            cfg.StrOpt('spark_jars_list'),
            cfg.StrOpt('spark_master_list'),
            cfg.StrOpt('spark_python_files'),
            cfg.IntOpt('stream_interval'),
            cfg.StrOpt('work_dir'),
            cfg.StrOpt('spark_home'),
            cfg.BoolOpt('enable_record_store_df_cache'),
            cfg.StrOpt('record_store_df_cache_storage_level')
        ]
        service_group = cfg.OptGroup(name='service', title='service')
        cfg.CONF.register_group(service_group)
        cfg.CONF.register_opts(service_opts, group=service_group)
config_initializer.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def load_stage_processors_options():
        app_opts = [
            cfg.BoolOpt('pre_hourly_processor_enabled'),
        ]
        app_group = cfg.OptGroup(name='stage_processors',
                                 title='stage_processors')
        cfg.CONF.register_group(app_group)
        cfg.CONF.register_opts(app_opts, group=app_group)
default_subnet.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_subnets(self, pod, project_id):
        subnet_id = config.CONF.neutron_defaults.pod_subnet

        if not subnet_id:
            # NOTE(ivc): this option is only required for
            # DefaultPodSubnetDriver and its subclasses, but it may be
            # optional for other drivers (e.g. when each namespace has own
            # subnet)
            raise cfg.RequiredOptError('pod_subnet',
                                       cfg.OptGroup('neutron_defaults'))

        return {subnet_id: _get_subnet(subnet_id)}
default_subnet.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_subnets(self, service, project_id):
        subnet_id = config.CONF.neutron_defaults.service_subnet

        if not subnet_id:
            # NOTE(ivc): this option is only required for
            # DefaultServiceSubnetDriver and its subclasses, but it may be
            # optional for other drivers (e.g. when each namespace has own
            # subnet)
            raise cfg.RequiredOptError('service_subnet',
                                       cfg.OptGroup('neutron_defaults'))

        return {subnet_id: _get_subnet(subnet_id)}
default_security_groups.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_security_groups(self, pod, project_id):
        sg_list = config.CONF.neutron_defaults.pod_security_groups

        if not sg_list:
            # NOTE(ivc): this option is only required for
            # Default{Pod,Service}SecurityGroupsDriver and its subclasses,
            # but it may be optional for other drivers (e.g. when each
            # namespace has own set of security groups)
            raise cfg.RequiredOptError('pod_security_groups',
                                       cfg.OptGroup('neutron_defaults'))

        return sg_list[:]
default_security_groups.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_security_groups(self, service, project_id):
        # NOTE(ivc): use the same option as DefaultPodSecurityGroupsDriver
        sg_list = config.CONF.neutron_defaults.pod_security_groups

        if not sg_list:
            # NOTE(ivc): this option is only required for
            # Default{Pod,Service}SecurityGroupsDriver and its subclasses,
            # but it may be optional for other drivers (e.g. when each
            # namespace has own set of security groups)
            raise cfg.RequiredOptError('pod_security_groups',
                                       cfg.OptGroup('neutron_defaults'))

        return sg_list[:]
default_project.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_project(self, pod):
        project_id = config.CONF.neutron_defaults.project

        if not project_id:
            # NOTE(ivc): this option is only required for
            # DefaultPodProjectDriver and its subclasses, but it may be
            # optional for other drivers (e.g. when each namespace has own
            # project)
            raise cfg.RequiredOptError('project',
                                       cfg.OptGroup('neutron_defaults'))

        return project_id
default_project.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_project(self, service):
        project_id = config.CONF.neutron_defaults.project

        if not project_id:
            # NOTE(ivc): this option is only required for
            # DefaultServiceProjectDriver and its subclasses, but it may be
            # optional for other drivers (e.g. when each namespace has own
            # project)
            raise cfg.RequiredOptError('project',
                                       cfg.OptGroup('neutron_defaults'))

        return project_id
exporter.py 文件源码 项目:galaxia 作者: WiproOpenSourcePractice 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def main():
    service.prepare_service("gexporter", sys.argv)

    CONF = cfg.CONF
    opt_group = cfg.OptGroup(name='gexporter', title='Options for the\
                                                     exporter service')
    CONF.register_group(opt_group)
    CONF.register_opts(API_SERVICE_OPTS, opt_group)
    CONF.set_override('topic', CONF.gexporter.topic, opt_group)
    CONF.set_override('rabbitmq_host', CONF.gexporter.rabbitmq_host, opt_group)
    CONF.set_override('rabbitmq_port', CONF.gexporter.rabbitmq_port, opt_group)
    CONF.set_override('rabbitmq_username', CONF.gexporter.rabbitmq_username,
                      opt_group)

    endpoints = [
        controller.Controller(),
    ]

    log.info('Starting exporter service in PID %s' % os.getpid())

    rpc_server = broker.Broker(CONF.gexporter.topic,
                               CONF.gexporter.rabbitmq_host,
                               endpoints)
    print 'Galaxia Exporter service started in PID %s' % os.getpid()

    rpc_server.serve()
renderer.py 文件源码 项目:galaxia 作者: WiproOpenSourcePractice 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def main():
    service.prepare_service("grenderer", sys.argv)

    CONF = cfg.CONF
    opt_group = cfg.OptGroup(name='grenderer', title='Options for the\
                                                     renderer service')
    CONF.register_group(opt_group)
    CONF.register_opts(API_SERVICE_OPTS, opt_group)
    CONF.set_override('topic', CONF.grenderer.topic, opt_group)
    CONF.set_override('rabbitmq_host', CONF.grenderer.rabbitmq_host, opt_group)
    CONF.set_override('rabbitmq_port', CONF.grenderer.rabbitmq_port, opt_group)
    CONF.set_override('rabbitmq_username', CONF.grenderer.rabbitmq_username,
                      opt_group)
    CONF.set_override('handler', CONF.grenderer.handler, opt_group)

    handlers = {
        'prometheus': controller.Controller,
    }

    endpoints = [
        handlers[CONF.grenderer.handler](),
    ]

    log.info('Starting renderer service in PID %s' % os.getpid())

    rpc_server = broker.Broker(CONF.grenderer.topic,
                               CONF.grenderer.rabbitmq_host,
                               endpoints)
    print 'Galaxia Renderer service started in PID %s' % os.getpid()

    rpc_server.serve()
api.py 文件源码 项目:galaxia 作者: WiproOpenSourcePractice 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def create_metrics_exporter(self, **kwargs):
        """
        Handler to create metrics exporter
        :param kwargs:
        :return:
        """
        """
        CONF = cfg.CONF
        opt_group = cfg.OptGroup(name='gapi',
                                 title='Options for the api service')
        CONF.register_group(opt_group)
        CONF.register_opts(API_SERVICE_OPTS, opt_group)
        CONF.set_override('topic_exporter', CONF.gapi.topic_exporter,
                          opt_group)
        CONF.set_override('rabbitmq_host', CONF.gapi.rabbitmq_host,
                          opt_group)
        """
        exporter_id = str(uuid.uuid4())
        kwargs['exporter_id'] = exporter_id

        sql_query = query_list.INSERT_INTO_EXPORTER
        params = kwargs['exporter_name'], exporter_id
        try:
            conn = sql_helper.engine.connect()
            conn.execute(sql_query, params)
        except SQLAlchemyError as ex:
            return "A database exception has been hit and metrics exporter has\
                   failed"

        log.info("Initializing a client connection")
        try:
            mq_client = client.Client(CONF.gapi.topic_exporter,
                                  CONF.gapi.rabbitmq_host)
            ctxt = {}
            log.info("Publishing message to rabbitmq")
            mq_client.rpc_client.cast(ctxt, 'export_metrics', message=kwargs)
        except Exception as ex:
            return "A messaging exception has been hit and metrics export\
                   request has failed"

        return "Metrics export request has been successfully accepted"
service.py 文件源码 项目:galaxia 作者: WiproOpenSourcePractice 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def prepare_service(service_name, argv=[]):
    """
    Initializes the various galaxia services
    :param service_name: Name of the service
    :param argv: Configuration file
    :return:
    """

    cfg.CONF(argv[1:], project='galaxia')

    opt_group = cfg.OptGroup(name=service_name, title='Logging options for\
                                                      the service')
    cfg.CONF.register_group(opt_group)
    cfg.CONF.register_opts(LOGGING_SERVICE_OPTS, opt_group)

    log_file = cfg.CONF._get("log_file", group=opt_group)
    log_level = cfg.CONF._get("log_level", group=opt_group)

    log_helper.setup_logging(log_file, log_level)

    db_group = cfg.OptGroup(name='db', title='Options for the database')
    cfg.CONF.register_group(db_group)
    cfg.CONF.register_opts(DB_SERVICE_OPTS, db_group)
    cfg.CONF.set_override('db_host', cfg.CONF.db.db_host, db_group)
    cfg.CONF.set_override('type', cfg.CONF.db.type, db_group)
    cfg.CONF.set_override('db_location', cfg.CONF.db.db_location, db_group)

    sql_helper.init_db(cfg.CONF.db.type, cfg.CONF.db.username,
                       cfg.CONF.db.password, cfg.CONF.db.db_host, "galaxia", cfg.CONF.db.db_location)
    load_mapping.initialize()
test_conf.py 文件源码 项目:CAL 作者: HPCC-Cloud-Computing 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_list_opts(self):
        for group, opt_list in opts.list_opts():
            if isinstance(group, six.string_types):
                self.assertEqual(group, 'DEFAULT')
            else:
                self.assertIsInstance(group, cfg.OptGroup)
            for opt in opt_list:
                self.assertIsInstance(opt, cfg.Opt)
test_conf.py 文件源码 项目:CAL 作者: HPCC-Cloud-Computing 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_load_config_file_to_realize_all_driver(self):
        CONF(['--config-file',
              'calplus/tests/fake_config_file.conf'])
        # TODO: Maybe we need remove example group,
        # such as: openstack and amazon

        # ensure all driver groups have been registered
        sections = CONF.list_all_sections()
        for section in sections:
            CONF.register_group(cfg.OptGroup(section))

        # ensure all of enable drivers configured exact opts
        enable_drivers = CONF.providers.enable_drivers
        for driver in enable_drivers.keys():
            if enable_drivers.get(driver) == 'openstack':
                CONF.register_opts(
                    calplus.conf.providers.openstack_opts, driver)
            elif enable_drivers.get(driver) == 'amazon':
                CONF.register_opts(
                    calplus.conf.providers.amazon_opts, driver)
            else:
                continue

        self.assertEqual(CONF.openstack1['driver_name'], 'HUST')
        self.assertEqual(CONF.openstack2['driver_name'], 'SOICT')
        self.assertEqual(CONF.amazon['driver_name'], 'Amazon')
test_conf.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_list_opts(self):
        for group, opt_list in opts.list_opts():
            if isinstance(group, six.string_types):
                self.assertEqual('DEFAULT', group)
            else:
                self.assertIsInstance(group, cfg.OptGroup)
            for opt in opt_list:
                self.assertIsInstance(opt, cfg.Opt)
ssl.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def list_opts():
    group_name, ssl_opts = sslutils.list_opts()[0]
    ssl_group = cfg.OptGroup(name=group_name,
                             title='Options for the ssl')
    return {
        ssl_group: ssl_opts
    }
settings_oslo.py 文件源码 项目:fuel-ccp-tests 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def register_opts(config):
    config.register_group(cfg.OptGroup(name='hardware',
                          title="Hardware settings", help=""))
    config.register_opts(group='hardware', opts=hardware_opts)

    config.register_group(cfg.OptGroup(name='underlay',
                          title="Underlay configuration", help=""))
    config.register_opts(group='underlay', opts=underlay_opts)

    config.register_group(cfg.OptGroup(name='k8s_deploy',
                          title="K8s deploy configuration", help=""))
    config.register_opts(group='k8s_deploy', opts=k8s_deploy_opts)

    config.register_group(cfg.OptGroup(name='k8s',
                          title="K8s config and credentials", help=""))
    config.register_opts(group='k8s', opts=k8s_opts)

    config.register_group(cfg.OptGroup(name='ccp_deploy',
                          title="CCP deploy configuration", help=""))
    config.register_opts(group='ccp_deploy', opts=ccp_deploy_opts)

    config.register_group(cfg.OptGroup(name='ccp',
                          title="CCP config and credentials", help=""))
    config.register_opts(group='ccp', opts=ccp_opts)

    config.register_group(cfg.OptGroup(name='os',
                          title="Openstack config and credentials", help=""))
    config.register_opts(group='os', opts=os_opts)
    config.register_group(
        cfg.OptGroup(name='os_deploy',
                     title="Openstack deploy config and credentials",
                     help=""))
    config.register_opts(group='os_deploy', opts=os_deploy_opts)
    return config
service_providers.py 文件源码 项目:mixmatch 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def post_config(conf):
    for service_provider in conf.service_providers:
        sp_group = cfg.OptGroup(name='sp_%s' % service_provider,
                                title=service_provider)
        conf.register_opts(SP_OPTS, sp_group)
test_conf.py 文件源码 项目:valence 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_list_opts(self):
        for group, opt_list in opts.list_opts():
            if isinstance(group, six.string_types):
                self.assertEqual('DEFAULT', group)
            else:
                self.assertIsInstance(group, cfg.OptGroup)
            for opt in opt_list:
                self.assertIsInstance(opt, cfg.Opt)
valet_filter.py 文件源码 项目:valet 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _register_opts(self):
        """Register additional options specific to this filter plugin"""
        opts = []
        option = cfg.StrOpt('failure_mode',
                            choices=['reject', 'yield'], default='reject',
                            help=_('Mode to operate in if Valet '
                                   'planning fails for any reason.'))

        # In the filter plugin space, there's no access to Nova's
        # keystone credentials, so we have to specify our own.
        # This also means we can't act as the user making the request
        # at scheduling-time.
        opts.append(option)
        option = cfg.StrOpt('admin_tenant_name', default=None,
                            help=_('Valet Project Name'))
        opts.append(option)
        option = cfg.StrOpt('admin_username', default=None,
                            help=_('Valet Username'))
        opts.append(option)
        option = cfg.StrOpt('admin_password', default=None,
                            help=_('Valet Password'))
        opts.append(option)
        option = cfg.StrOpt('admin_auth_url', default=None,
                            help=_('Keystone Authorization API Endpoint'))
        opts.append(option)

        opt_group = cfg.OptGroup('valet')
        cfg.CONF.register_group(opt_group)
        cfg.CONF.register_opts(opts, group=opt_group)
statsd.py 文件源码 项目:gnocchi 作者: gnocchixyz 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def start():
    conf = service.prepare_service()

    if conf.statsd.resource_id is None:
        raise cfg.RequiredOptError("resource_id", cfg.OptGroup("statsd"))

    stats = Stats(conf)

    loop = asyncio.get_event_loop()
    # TODO(jd) Add TCP support
    listen = loop.create_datagram_endpoint(
        lambda: StatsdServer(stats),
        local_addr=(conf.statsd.host, conf.statsd.port))

    def _flush():
        loop.call_later(conf.statsd.flush_delay, _flush)
        stats.flush()

    loop.call_later(conf.statsd.flush_delay, _flush)
    transport, protocol = loop.run_until_complete(listen)

    LOG.info("Started on %s:%d", conf.statsd.host, conf.statsd.port)
    LOG.info("Flush delay: %d seconds", conf.statsd.flush_delay)

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass

    transport.close()
    loop.close()


问题


面经


文章

微信
公众号

扫码关注公众号