python类SafeConfigParser()的实例源码

base.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __init__(self, config):

                self.classification_data = None

                self.classification_path = config.get(
                    "pkglint", "info_classification_path")
                self.skip_classification_check = False

                # a default error message used if we've parsed the
                # data file, but haven't thrown any exceptions
                self.bad_classification_data = _("no sections found in data "
                    "file {0}").format(self.classification_path)

                if os.path.exists(self.classification_path):
                        try:
                                if six.PY2:
                                        self.classification_data = \
                                            configparser.SafeConfigParser()
                                        self.classification_data.readfp(
                                            open(self.classification_path))
                                else:
                                        # SafeConfigParser has been renamed to
                                        # ConfigParser in Python 3.2.
                                        self.classification_data = \
                                            configparser.ConfigParser()
                                        self.classification_data.read_file(
                                            open(self.classification_path))
                        except Exception as err:
                                # any exception thrown here results in a null
                                # classification_data object.  We deal with that
                                # later.
                                self.bad_classification_data = _(
                                    "unable to parse data file {path}: "
                                    "{err}").format(
                                    path=self.classification_path,
                                    err=err)
                                pass
                else:
                        self.bad_classification_data = _("missing file {0}").format(
                            self.classification_path)
                super(ManifestChecker, self).__init__(config)
config.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __init__(self, config_file=None):

                if config_file:
                        try:
                                # ConfigParser doesn't do a good job of
                                # error reporting, so we'll just try to open
                                # the file
                                open(config_file, "r").close()
                        except (EnvironmentError) as err:
                                raise PkglintConfigException(
                                    _("unable to read config file: {0} ").format(
                                    err))
                try:
                        if six.PY2:
                                self.config = configparser.SafeConfigParser(
                                    defaults)
                        else:
                                # SafeConfigParser has been renamed to
                                # ConfigParser in Python 3.2.
                                self.config = configparser.ConfigParser(
                                    defaults)
                        if not config_file:
                                if six.PY2:
                                        self.config.readfp(
                                            open("/usr/share/lib/pkg/pkglintrc"))
                                else:
                                        self.config.read_file(
                                            open("/usr/share/lib/pkg/pkglintrc"))
                                self.config.read(
                                    [os.path.expanduser("~/.pkglintrc")])
                        else:
                                self.config.read(config_file)

                        # sanity check our config by looking for a known key
                        self.config.get("pkglint", "log_level")
                except configparser.Error as err:
                        raise PkglintConfigException(
                            _("missing or corrupt pkglintrc file "
                            "{config_file}: {err}").format(**locals()))
test_util.py 文件源码 项目:Hawkeye 作者: tozhengxq 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def config_from_ini(self, ini):
        config = {}
        parser = configparser.SafeConfigParser()
        ini = textwrap.dedent(six.u(ini))
        parser.readfp(io.StringIO(ini))
        for section in parser.sections():
            config[section] = dict(parser.items(section))
        return config
server.py 文件源码 项目:python-zulip-api 作者: zulip 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def read_config_file(config_file_path):
    # type: (str) -> None
    config_file_path = os.path.abspath(os.path.expanduser(config_file_path))
    if not os.path.isfile(config_file_path):
        raise IOError("Could not read config file {}: File not found.".format(config_file_path))
    parser = SafeConfigParser()
    parser.read(config_file_path)

    for section in parser.sections():
        bots_config[section] = {
            "email": parser.get(section, 'email'),
            "key": parser.get(section, 'key'),
            "site": parser.get(section, 'site'),
        }
tasks.py 文件源码 项目:delft3d-gt-server 作者: openearth 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def do_docker_create(self, label, parameters, environment, name, image,
                     volumes, memory_limit, folders, command):
    """
    Create necessary directories in a working directory
    for the mounts in the containers.

    Write .ini file filled with given parameters in each folder.

    Create a new docker container from a given image and
    return the id of the container
    """
    # Create needed folders for mounts
    for folder in folders:
        try:
            os.makedirs(folder, 0o2775)
        # Path already exists, ignore
        except OSError:
            if not os.path.isdir(folder):
                raise

    # Create ini file for containers
    config = configparser.SafeConfigParser()
    for section in parameters:
        if not config.has_section(section):
            config.add_section(section)
        for key, value in parameters[section].items():

            # TODO: find more elegant solution for this! ugh!
            if not key == 'units':
                if not config.has_option(section, key):
                    config.set(*map(str, [section, key, value]))

    for folder in folders:
        with open(os.path.join(folder, 'input.ini'), 'w') as f:
            config.write(f)  # Yes, the ConfigParser writes to f

    # Create docker container
    client = Client(base_url=settings.DOCKER_URL)
    # We could also pass mem_reservation since docker-py 1.10
    config = client.create_host_config(binds=volumes, mem_limit=memory_limit)
    container = client.create_container(
        image,  # docker image
        name=name,
        host_config=config,  # mounts
        command=command,  # command to run
        environment=environment,  # {'uuid' = ""} for cloud fs sync
        labels=label  # type of container
    )
    container_id = container.get('Id')
    return container_id, ""
test_tasks.py 文件源码 项目:delft3d-gt-server 作者: openearth 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_do_docker_create(self, mockClient):
        """
        Assert that the docker_create task
        calls the docker client.create_container() function.
        """
        image = "IMAGENAME"
        volumes = ['/:/data/output:z',
                   '/:/data/input:ro']
        memory_limit = '1g'
        command = "echo test"
        config = {}
        environment = {'a': 1, 'b': 2}
        label = {"type": "delft3d"}
        folder = ['input', 'output']
        name = 'test-8172318273'
        workingdir = os.path.join(os.getcwd(), 'test')
        folders = [os.path.join(workingdir, f) for f in folder]
        parameters = {u'test':
                      {u'1': u'a', u'2': u'b', 'units': 'ignoreme'}
                      }
        mockClient.return_value.create_host_config.return_value = config

        do_docker_create.delay(label, parameters, environment, name,
                               image, volumes, memory_limit, folders, command)

        # Assert that docker is called
        mockClient.return_value.create_container.assert_called_with(
            image,
            host_config=config,
            command=command,
            name=name,
            environment=environment,
            labels=label
        )

        # Assert that folders are created
        listdir = os.listdir(workingdir)
        for f in listdir:
            self.assertIn(f, listdir)

        for folder in folders:
            ini = os.path.join(folder, 'input.ini')
            self.assertTrue(os.path.isfile(ini))

            config = configparser.SafeConfigParser()
            config.readfp(open(ini))
            for key in parameters.keys():
                self.assertTrue(config.has_section(key))
                for option, value in parameters[key].items():
                    if option != 'units':
                        self.assertTrue(config.has_option(key, option))
                        self.assertEqual(config.get(key, option), value)
                    else:  # units should be ignored
                        self.assertFalse(config.has_option(key, option))
base.py 文件源码 项目:python-zunclient 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _get_config(self):
        config_file = os.environ.get('ZUNCLIENT_TEST_CONFIG',
                                     DEFAULT_CONFIG_FILE)
        # SafeConfigParser was deprecated in Python 3.2
        if six.PY3:
            config = config_parser.ConfigParser()
        else:
            config = config_parser.SafeConfigParser()
        if not config.read(config_file):
            self.skipTest('Skipping, no test config found @ %s' % config_file)
        try:
            auth_strategy = config.get('functional', 'auth_strategy')
        except config_parser.NoOptionError:
            auth_strategy = 'keystone'
        if auth_strategy not in ['keystone', 'noauth']:
            raise self.fail(
                'Invalid auth type specified: %s in functional must be '
                'one of: [keystone, noauth]' % auth_strategy)

        conf_settings = []
        keystone_v3_conf_settings = []
        if auth_strategy == 'keystone':
            conf_settings += ['os_auth_url', 'os_username',
                              'os_password', 'os_project_name',
                              'os_identity_api_version']
            keystone_v3_conf_settings += ['os_user_domain_id',
                                          'os_project_domain_id']
        else:
            conf_settings += ['os_auth_token', 'zun_url']

        cli_flags = {}
        missing = []
        for c in conf_settings + keystone_v3_conf_settings:
            try:
                cli_flags[c] = config.get('functional', c)
            except config_parser.NoOptionError:
                # NOTE(vdrok): Here we ignore the absence of KS v3 options as
                # v2 may be used. Keystone client will do the actual check of
                # the parameters' correctness.
                if c not in keystone_v3_conf_settings:
                    missing.append(c)
        if missing:
            self.fail('Missing required setting in test.conf (%(conf)s) for '
                      'auth_strategy=%(auth)s: %(missing)s' %
                      {'conf': config_file,
                       'auth': auth_strategy,
                       'missing': ','.join(missing)})
        return cli_flags
config.py 文件源码 项目:scitokens 作者: scitokens 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def set_config(config = None):
    """
    Set the configuration of SciTokens library
    :param config: config may be: A full path to a ini configuration file,
        A ConfigParser instance, or None, which will use all defaults.
    """
    global configuration # pylint: disable=C0103

    if isinstance(config, six.string_types):
        configuration = configparser.SafeConfigParser(CONFIG_DEFAULTS)
        configuration.read([config])
    elif isinstance(config, configparser.RawConfigParser):
        configuration = config
    elif config is None:
        print("Using built-in defaults")
        configuration = configparser.SafeConfigParser(CONFIG_DEFAULTS)
        configuration.add_section("scitokens")
    else:
        pass

    logger = logging.getLogger("scitokens")

    if configuration.has_option("scitokens", "log_file"):
        log_file = configuration.get("scitokens", "log_file")
        if log_file is not None:
            # Create loggers with 100MB files, rotated 5 times
            logger.addHandler(logging.handlers.RotatingFileHandler(log_file, maxBytes=100 * (1024*1000), backupCount=5))

    else:
        logger.addHandler(logging.StreamHandler())

    # Set the logging
    log_level = configuration.get("scitokens", "log_level")
    if log_level == "DEBUG":
        logger.setLevel(logging.DEBUG)
    elif log_level == "INFO":
        logger.setLevel(logging.INFO)
    elif log_level == "WARNING":
        logger.setLevel(logging.WARNING)
    elif log_level == "ERROR":
        logger.setLevel(logging.ERROR)
    elif log_level == "CRITICAL":
        logger.setLevel(logging.CRITICAL)
    else:
        logger.setLevel(logging.WARNING)
diskbenchmark.py 文件源码 项目:treadmill 作者: Morgan-Stanley 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def benchmark(directory,
              volume=BENCHMARK_VOLUME,
              rw_type=BENCHMARK_RW_TYPE,
              job_number=BENCHMARK_JOB_NUMBER,
              thread_number=BENCHMARK_THREAD_NUMBER,
              block_size=BENCHMARK_IOPS_BLOCK_SIZE,
              max_seconds=BENCHMARK_MAX_SECONDS):
    """Use fio to do benchmark.
    """
    result = {}
    config_file = os.path.join(directory, _BENCHMARK_CONFIG_FILE)
    result_file = os.path.join(directory, _BENCHMARK_RESULT_FILE)

    # prepare fio config
    config = configparser.SafeConfigParser()
    global_section = 'global'
    config.add_section(global_section)
    config.set(global_section, 'group_reporting', '1')
    config.set(global_section, 'unlink', '1')
    config.set(global_section, 'time_based', '1')
    config.set(global_section, 'direct', '1')
    config.set(global_section, 'size', volume)
    config.set(global_section, 'rw', rw_type)
    config.set(global_section, 'numjobs', job_number)
    config.set(global_section, 'iodepth', thread_number)
    config.set(global_section, 'bs', block_size)
    config.set(global_section, 'runtime', max_seconds)
    drive_section = 'drive'
    config.add_section(drive_section)
    config.set(drive_section, 'directory', directory)
    fs.write_safe(
        config_file,
        lambda f: config.write(EqualSpaceRemover(f))
    )

    # start fio
    ret = subproc.call(
        ['fio', config_file, '--norandommap',
         '--minimal', '--output', result_file]
    )

    # parse fio terse result
    # http://fio.readthedocs.io/en/latest/fio_doc.html#terse-output
    if ret == 0:
        with io.open(result_file) as fp:
            metric_list = fp.read().split(';')
            result[Metrics.READ_BPS.value] = int(
                float(metric_list[6]) * 1024
            )
            result[Metrics.READ_IOPS.value] = int(metric_list[7])
            result[Metrics.WRITE_BPS.value] = int(
                float(metric_list[47]) * 1024
            )
            result[Metrics.WRITE_IOPS.value] = int(metric_list[48])

    return result
profitbricks_inventory.py 文件源码 项目:ci 作者: dlang 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def read_settings(self):
        ''' Reads the settings from the profitbricks_inventory.ini file '''

        if six.PY3:
            config = configparser.ConfigParser()
        else:
            config = configparser.SafeConfigParser()

        config.read(os.path.dirname(os.path.realpath(__file__)) + '/profitbricks_inventory.ini')

        # Credentials
        if config.has_option('profitbricks', 'subscription_user'):
            self.subscription_user = config.get('profitbricks', 'subscription_user')
        if config.has_option('profitbricks', 'subscription_password'):
            self.subscription_password = config.get('profitbricks', 'subscription_password')
        if config.has_option('profitbricks', 'subscription_password_file'):
            self.subscription_password_file = config.get('profitbricks', 'subscription_password_file')

        if config.has_option('profitbricks', 'api_url'):
            self.api_url = config.get('profitbricks', 'api_url')

        # Cache
        if config.has_option('profitbricks', 'cache_path'):
            self.cache_path = config.get('profitbricks', 'cache_path')
        if config.has_option('profitbricks', 'cache_max_age'):
            self.cache_max_age = config.getint('profitbricks', 'cache_max_age')

        # Group variables
        if config.has_option('profitbricks', 'vars'):
            self.vars = ast.literal_eval(config.get('profitbricks', 'vars'))

        # Groups
        group_by_options = [
            'group_by_datacenter_id',
            'group_by_location',
            'group_by_availability_zone',
            'group_by_image_name',
            'group_by_licence_type'
        ]
        for option in group_by_options:
            if config.has_option('profitbricks', option):
                setattr(self, option, config.getboolean('profitbricks', option))
            else:
                setattr(self, option, True)

        # Inventory Hostname
        option = 'server_name_as_inventory_hostname'
        if config.has_option('profitbricks', option):
            setattr(self, option, config.getboolean('profitbricks', option))
        else:
            setattr(self, option, False)
bless_config.py 文件源码 项目:python-blessclient 作者: lyft 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def parse_config_file(self, config_file):
        config = SafeConfigParser(self.DEFAULT_CONFIG)
        config.readfp(config_file)

        blessconfig = {
            'CLIENT_CONFIG': {
                'domain_regex': config.get('CLIENT', 'domain_regex'),
                'cache_dir': config.get('CLIENT', 'cache_dir'),
                'cache_file': config.get('CLIENT', 'cache_file'),
                'mfa_cache_dir': config.get('CLIENT', 'mfa_cache_dir'),
                'mfa_cache_file': config.get('CLIENT', 'mfa_cache_file'),
                'ip_urls': [s.strip() for s in config.get('CLIENT', 'ip_urls').split(",")],
                'update_script': config.get('CLIENT', 'update_script'),
                'user_session_length': int(config.get('CLIENT', 'user_session_length')),
                'usebless_role_session_length': int(config.get('CLIENT', 'usebless_role_session_length')),
            },
            'BLESS_CONFIG': {
                'userrole': config.get('LAMBDA', 'user_role'),
                'accountid': config.get('LAMBDA', 'account_id'),
                'functionname': config.get('LAMBDA', 'functionname'),
                'functionversion': config.get('LAMBDA', 'functionversion'),
                'certlifetime': config.getint('LAMBDA', 'certlifetime'),
                'ipcachelifetime': config.getint('LAMBDA', 'ipcachelifetime'),
                'timeoutconfig': {
                    'connect': config.getint('LAMBDA', 'timeout_connect'),
                    'read': config.getint('LAMBDA', 'timeout_read')
                }
            },
            'AWS_CONFIG': {
                'bastion_ips': config.get('MAIN', 'bastion_ips'),
                'remote_user': config.get('MAIN', 'remote_user')
            },
            'REGION_ALIAS': {}
        }

        regions = config.get('MAIN', 'region_aliases').split(",")
        regions = [region.strip() for region in regions]
        for region in regions:
            region = region.upper()
            kms_region_key = 'KMSAUTH_CONFIG_{}'.format(region)
            blessconfig.update({kms_region_key: self._get_region_kms_config(region, config)})
            blessconfig['REGION_ALIAS'].update({region: blessconfig[kms_region_key]['awsregion']})
        return blessconfig
add-node.py 文件源码 项目:openshift-ansible-contrib 作者: openshift 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def update_ini_file(self):
        ''' Update INI file with added number of nodes '''
        scriptbasename = "ocp-on-vmware"
        defaults = {'vmware': {
            'ini_path': os.path.join(os.path.dirname(__file__), '%s.ini' % scriptbasename),
            'master_nodes':'3',
            'infra_nodes':'2',
            'storage_nodes': '0',
            'app_nodes':'3' }
        }
        # where is the config?
        if six.PY3:
            config = configparser.ConfigParser()
        else:
            config = configparser.SafeConfigParser()

        vmware_ini_path = os.environ.get('VMWARE_INI_PATH', defaults['vmware']['ini_path'])
        vmware_ini_path = os.path.expanduser(os.path.expandvars(vmware_ini_path))
        config.read(vmware_ini_path)


        if 'app' in self.node_type:
            self.app_nodes = int(self.app_nodes) + int(self.node_number)
            config.set('vmware', 'app_nodes', str(self.app_nodes))
            print "Updating %s file with %s app_nodes" % (vmware_ini_path, str(self.app_nodes))
        if 'infra' in self.node_type:
            self.infra_nodes = int(self.infra_nodes) + int(self.node_number)
            config.set('vmware', 'infra_nodes', str(self.infra_nodes))
            print "Updating %s file with %s infra_nodes" % (vmware_ini_path, str(self.infra_nodes))
        if 'storage' in self.node_type:
            if 'clean' in self.tag:
                self.storage_nodes = int(self.storage_nodes) - int(self.node_number)
            else:
                self.storage_nodes = int(self.storage_nodes) + int(self.node_number)
            config.set('vmware', 'storage_nodes', str(self.storage_nodes))
            print "Updating %s file with %s storage_nodes" % (vmware_ini_path, str(self.storage_nodes))

        for line in fileinput.input(vmware_ini_path, inplace=True):
            if line.startswith("app_nodes"):
                print "app_nodes=" + str(self.app_nodes)
            elif line.startswith("infra_nodes"):
                print "infra_nodes=" + str(self.infra_nodes)
            elif line.startswith("storage_nodes"):
                print "storage_nodes=" + str(self.storage_nodes)
            else:
                print line,
add-node.py 文件源码 项目:openshift-ansible 作者: nearform 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def update_ini_file(self):
        ''' Update INI file with added number of nodes '''
        scriptbasename = "ocp-on-vmware"
        defaults = {'vmware': {
            'ini_path': os.path.join(os.path.dirname(__file__), '%s.ini' % scriptbasename),
            'master_nodes':'3',
            'infra_nodes':'2',
            'storage_nodes': '0',
            'app_nodes':'3' }
        }
        # where is the config?
        if six.PY3:
            config = configparser.ConfigParser()
        else:
            config = configparser.SafeConfigParser()

        vmware_ini_path = os.environ.get('VMWARE_INI_PATH', defaults['vmware']['ini_path'])
        vmware_ini_path = os.path.expanduser(os.path.expandvars(vmware_ini_path))
        config.read(vmware_ini_path)


        if 'app' in self.node_type:
            self.app_nodes = int(self.app_nodes) + int(self.node_number)
            config.set('vmware', 'app_nodes', str(self.app_nodes))
            print "Updating %s file with %s app_nodes" % (vmware_ini_path, str(self.app_nodes))
        if 'infra' in self.node_type:
            self.infra_nodes = int(self.infra_nodes) + int(self.node_number)
            config.set('vmware', 'infra_nodes', str(self.infra_nodes))
            print "Updating %s file with %s infra_nodes" % (vmware_ini_path, str(self.infra_nodes))
        if 'storage' in self.node_type:
            if 'clean' in self.tag:
                self.storage_nodes = int(self.storage_nodes) - int(self.node_number)
            else:
                self.storage_nodes = int(self.storage_nodes) + int(self.node_number)
            config.set('vmware', 'storage_nodes', str(self.storage_nodes))
            print "Updating %s file with %s storage_nodes" % (vmware_ini_path, str(self.storage_nodes))

        for line in fileinput.input(vmware_ini_path, inplace=True):
            if line.startswith("app_nodes"):
                print "app_nodes=" + str(self.app_nodes)
            elif line.startswith("infra_nodes"):
                print "infra_nodes=" + str(self.infra_nodes)
            elif line.startswith("storage_nodes"):
                print "storage_nodes=" + str(self.storage_nodes)
            else:
                print line,


问题


面经


文章

微信
公众号

扫码关注公众号