python类iteritems()的实例源码

charm_helpers_sync.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def sync_helpers(include, src, dest, options=None):
    if not os.path.isdir(dest):
        os.makedirs(dest)

    global_options = parse_sync_options(options)

    for inc in include:
        if isinstance(inc, str):
            inc, opts = extract_options(inc, global_options)
            sync(src, dest, inc, opts)
        elif isinstance(inc, dict):
            # could also do nested dicts here.
            for k, v in six.iteritems(inc):
                if isinstance(v, list):
                    for m in v:
                        inc, opts = extract_options(m, global_options)
                        sync(src, dest, '%s.%s' % (k, inc), opts)
cqltypes.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def serialize_safe(cls, themap, protocol_version):
        key_type, value_type = cls.subtypes
        pack = int32_pack if protocol_version >= 3 else uint16_pack
        buf = io.BytesIO()
        buf.write(pack(len(themap)))
        try:
            items = six.iteritems(themap)
        except AttributeError:
            raise TypeError("Got a non-map object for a map value")
        inner_proto = max(3, protocol_version)
        for key, val in items:
            keybytes = key_type.to_binary(key, inner_proto)
            valbytes = value_type.to_binary(val, inner_proto)
            buf.write(pack(len(keybytes)))
            buf.write(keybytes)
            buf.write(pack(len(valbytes)))
            buf.write(valbytes)
        return buf.getvalue()
cluster.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_hacluster_config(exclude_keys=None):
    '''
    Obtains all relevant configuration from charm configuration required
    for initiating a relation to hacluster:

        ha-bindiface, ha-mcastport, vip

    param: exclude_keys: list of setting key(s) to be excluded.
    returns: dict: A dict containing settings keyed by setting name.
    raises: HAIncompleteConfig if settings are missing.
    '''
    settings = ['ha-bindiface', 'ha-mcastport', 'vip']
    conf = {}
    for setting in settings:
        if exclude_keys and setting in exclude_keys:
            continue

        conf[setting] = config_get(setting)
    missing = []
    [missing.append(s) for s, v in six.iteritems(conf) if v is None]
    if missing:
        log('Insufficient config data to configure hacluster.', level=ERROR)
        raise HAIncompleteConfig
    return conf
loopback.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def ensure_loopback_device(path, size):
    '''
    Ensure a loopback device exists for a given backing file path and size.
    If it a loopback device is not mapped to file, a new one will be created.

    TODO: Confirm size of found loopback device.

    :returns: str: Full path to the ensured loopback device (eg, /dev/loop0)
    '''
    for d, f in six.iteritems(loopback_devices()):
        if f == path:
            return d

    if not os.path.exists(path):
        cmd = ['truncate', '--size', size, path]
        check_call(cmd)

    return create_loopback(path)
context.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def context_complete(self, ctxt):
        """Check for missing data for the required context data.
        Set self.missing_data if it exists and return False.
        Set self.complete if no missing data and return True.
        """
        # Fresh start
        self.complete = False
        self.missing_data = []
        for k, v in six.iteritems(ctxt):
            if v is None or v == '':
                if k not in self.missing_data:
                    self.missing_data.append(k)

        if self.missing_data:
            self.complete = False
            log('Missing required data: %s' % ' '.join(self.missing_data), level=INFO)
        else:
            self.complete = True
        return self.complete
context.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __call__(self):
        ports = config('data-port')
        if ports:
            # Map of {port/mac:bridge}
            portmap = parse_data_port_mappings(ports)
            ports = portmap.keys()
            # Resolve provided ports or mac addresses and filter out those
            # already attached to a bridge.
            resolved = self.resolve_ports(ports)
            # FIXME: is this necessary?
            normalized = {get_nic_hwaddr(port): port for port in resolved
                          if port not in ports}
            normalized.update({port: port for port in resolved
                               if port in ports})
            if resolved:
                return {normalized[port]: bridge for port, bridge in
                        six.iteritems(portmap) if port in normalized.keys()}

        return None
utils.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_os_version_package(pkg, fatal=True):
    '''Derive OpenStack version number from an installed package.'''
    codename = get_os_codename_package(pkg, fatal=fatal)

    if not codename:
        return None

    if 'swift' in pkg:
        vers_map = SWIFT_CODENAMES
        for cname, version in six.iteritems(vers_map):
            if cname == codename:
                return version[-1]
    else:
        vers_map = OPENSTACK_CODENAMES
        for version, cname in six.iteritems(vers_map):
            if cname == codename:
                return version
    # e = "Could not determine OpenStack version for package: %s" % pkg
    # error_out(e)
utils.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def save_script_rc(script_path="scripts/scriptrc", **env_vars):
    """
    Write an rc file in the charm-delivered directory containing
    exported environment variables provided by env_vars. Any charm scripts run
    outside the juju hook environment can source this scriptrc to obtain
    updated config information necessary to perform health checks or
    service changes.
    """
    juju_rc_path = "%s/%s" % (charm_dir(), script_path)
    if not os.path.exists(os.path.dirname(juju_rc_path)):
        os.mkdir(os.path.dirname(juju_rc_path))
    with open(juju_rc_path, 'wb') as rc_script:
        rc_script.write(
            "#!/bin/bash\n")
        [rc_script.write('export %s=%s\n' % (u, p))
         for u, p in six.iteritems(env_vars) if u != "script_path"]
utils.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _validate_dict_data(self, expected, actual):
        """Validate dictionary data.

           Compare expected dictionary data vs actual dictionary data.
           The values in the 'expected' dictionary can be strings, bools, ints,
           longs, or can be a function that evaluates a variable and returns a
           bool.
           """
        self.log.debug('actual: {}'.format(repr(actual)))
        self.log.debug('expected: {}'.format(repr(expected)))

        for k, v in six.iteritems(expected):
            if k in actual:
                if (isinstance(v, six.string_types) or
                        isinstance(v, bool) or
                        isinstance(v, six.integer_types)):
                    # handle explicit values
                    if v != actual[k]:
                        return "{}:{}".format(k, actual[k])
                # handle function pointers, such as not_null or valid_ip
                elif not v(actual[k]):
                    return "{}:{}".format(k, actual[k])
            else:
                return "key '{}' does not exist".format(k)
        return None
utils.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run_action(self, unit_sentry, action,
                   _check_output=subprocess.check_output,
                   params=None):
        """Run the named action on a given unit sentry.

        params a dict of parameters to use
        _check_output parameter is used for dependency injection.

        @return action_id.
        """
        unit_id = unit_sentry.info["unit_name"]
        command = ["juju", "action", "do", "--format=json", unit_id, action]
        if params is not None:
            for key, value in params.iteritems():
                command.append("{}={}".format(key, value))
        self.log.info("Running command: %s\n" % " ".join(command))
        output = _check_output(command, universal_newlines=True)
        data = json.loads(output)
        action_id = data[u'Action queued with id']
        return action_id
hubble.py 文件源码 项目:nova 作者: hubblestack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _get_top_data(topfile):
    '''
    Helper method to retrieve and parse the nova topfile
    '''
    topfile = os.path.join(_hubble_dir()[1], topfile)

    try:
        with open(topfile) as handle:
            topdata = yaml.safe_load(handle)
    except Exception as e:
        raise CommandExecutionError('Could not load topfile: {0}'.format(e))

    if not isinstance(topdata, dict) or 'nova' not in topdata or \
            not(isinstance(topdata['nova'], dict)):
        raise CommandExecutionError('Nova topfile not formatted correctly')

    topdata = topdata['nova']

    ret = []

    for match, data in topdata.iteritems():
        if __salt__['match.compound'](match):
            ret.extend(data)

    return ret
eventrequest.py 文件源码 项目:estreamer 作者: spohara79 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, start_from, **kwargs):
        self.flags = Flags()
        if start_from == 0 or start_from == 0xFFFFFFFF:
            self.timestamp = start_from
        else:
            try:
                datetime.fromtimestamp(start_from)
            except TypeError as exc:
                raise_from(InvalidTimestampError('Timestamp invalid (0, 0xFFFFFFFF, or Unix Timestamp'), exc)
            else:
                self.timestamp = start_from
        for k,v in iteritems(kwargs):
            try:
              getattr(self.flags.flag, k)
              setattr(self.flags.flag, k, int(v))
            except AttributeError as exc:
                raise_from(InvalidFlagError('Invalid flag: {}'.format(k)), exc)

        # save the timestamp and flags for reuse (if needed)
        Struct.set_ts(self.timestamp)
        Struct.set_flags(self.flags.from_bytes)
        # build the request
        self.event_request = EventRequest(timestamp=self.timestamp,flags=self.flags.from_bytes)
        self.message_header =  MessageHeader(type=2, data=self.event_request.pack())
        self.record = self.message_header.pack()
oauth.py 文件源码 项目:DropboxConnect 作者: raguay 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _params_to_urlencoded(params):
    """
    Returns a application/x-www-form-urlencoded ``str`` representing the
    key/value pairs in ``params``.

    Keys are values are ``str()``'d before calling ``urllib.urlencode``, with
    the exception of unicode objects which are utf8-encoded.
    """
    def encode(o):
        if isinstance(o, six.binary_type):
            return o
        else:
            if isinstance(o, six.text_type):
                return o.encode('utf-8')
            else:
                return str(o).encode('utf-8')

    utf8_params = {encode(k): encode(v) for k, v in six.iteritems(params)}
    return url_encode(utf8_params)
views.py 文件源码 项目:sentry-plugins 作者: getsentry 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def save_changes(self):
        new_projects = []
        removed_projects = []

        for project_id, project in six.iteritems(self.projects_by_id):
            if project_id in self.cleaned_data['projects']:
                if enable_plugin_for_tenant(project, self.tenant):
                    new_projects.append(project)
            else:
                if disable_plugin_for_tenant(project, self.tenant):
                    removed_projects.append(project)

        if new_projects or removed_projects:
            with Context.for_tenant(self.tenant) as ctx:
                ctx.send_notification(
                    **make_subscription_update_notification(new_projects, removed_projects)
                )
                if removed_projects:
                    mentions.clear_project_mentions(self.tenant, removed_projects)
                ctx.push_recent_events_glance()
host.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def service(action, service_name, **kwargs):
    """Control a system service.

    :param action: the action to take on the service
    :param service_name: the name of the service to perform th action on
    :param **kwargs: additional params to be passed to the service command in
                    the form of key=value.
    """
    if init_is_systemd():
        cmd = ['systemctl', action, service_name]
    else:
        cmd = ['service', service_name, action]
        for key, value in six.iteritems(kwargs):
            parameter = '%s=%s' % (key, value)
            cmd.append(parameter)
    return subprocess.call(cmd) == 0
loopback.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def ensure_loopback_device(path, size):
    '''
    Ensure a loopback device exists for a given backing file path and size.
    If it a loopback device is not mapped to file, a new one will be created.

    TODO: Confirm size of found loopback device.

    :returns: str: Full path to the ensured loopback device (eg, /dev/loop0)
    '''
    for d, f in six.iteritems(loopback_devices()):
        if f == path:
            return d

    if not os.path.exists(path):
        cmd = ['truncate', '--size', size, path]
        check_call(cmd)

    return create_loopback(path)
utils.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_os_version_package(pkg, fatal=True):
    '''Derive OpenStack version number from an installed package.'''
    codename = get_os_codename_package(pkg, fatal=fatal)

    if not codename:
        return None

    if 'swift' in pkg:
        vers_map = SWIFT_CODENAMES
        for cname, version in six.iteritems(vers_map):
            if cname == codename:
                return version[-1]
    else:
        vers_map = OPENSTACK_CODENAMES
        for version, cname in six.iteritems(vers_map):
            if cname == codename:
                return version
    # e = "Could not determine OpenStack version for package: %s" % pkg
    # error_out(e)


# Module local cache variable for the os_release.
utils.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def save_script_rc(script_path="scripts/scriptrc", **env_vars):
    """
    Write an rc file in the charm-delivered directory containing
    exported environment variables provided by env_vars. Any charm scripts run
    outside the juju hook environment can source this scriptrc to obtain
    updated config information necessary to perform health checks or
    service changes.
    """
    juju_rc_path = "%s/%s" % (charm_dir(), script_path)
    if not os.path.exists(os.path.dirname(juju_rc_path)):
        os.mkdir(os.path.dirname(juju_rc_path))
    with open(juju_rc_path, 'wt') as rc_script:
        rc_script.write(
            "#!/bin/bash\n")
        [rc_script.write('export %s=%s\n' % (u, p))
         for u, p in six.iteritems(env_vars) if u != "script_path"]
utils.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def validate_svc_catalog_endpoint_data(self, expected, actual):
        """Validate service catalog endpoint data.

           Validate a list of actual service catalog endpoints vs a list of
           expected service catalog endpoints.
           """
        self.log.debug('Validating service catalog endpoint data...')
        self.log.debug('actual: {}'.format(repr(actual)))
        for k, v in six.iteritems(expected):
            if k in actual:
                ret = self._validate_dict_data(expected[k][0], actual[k][0])
                if ret:
                    return self.endpoint_error(k, ret)
            else:
                return "endpoint {} does not exist".format(k)
        return ret
utils.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _validate_dict_data(self, expected, actual):
        """Validate dictionary data.

           Compare expected dictionary data vs actual dictionary data.
           The values in the 'expected' dictionary can be strings, bools, ints,
           longs, or can be a function that evaluates a variable and returns a
           bool.
           """
        self.log.debug('actual: {}'.format(repr(actual)))
        self.log.debug('expected: {}'.format(repr(expected)))

        for k, v in six.iteritems(expected):
            if k in actual:
                if (isinstance(v, six.string_types) or
                        isinstance(v, bool) or
                        isinstance(v, six.integer_types)):
                    # handle explicit values
                    if v != actual[k]:
                        return "{}:{}".format(k, actual[k])
                # handle function pointers, such as not_null or valid_ip
                elif not v(actual[k]):
                    return "{}:{}".format(k, actual[k])
            else:
                return "key '{}' does not exist".format(k)
        return None
utils.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_unit_process_ids(self, unit_processes, expect_success=True):
        """Construct a dict containing unit sentries, process names, and
        process IDs.

        :param unit_processes: A dictionary of Amulet sentry instance
            to list of process names.
        :param expect_success: if False expect the processes to not be
            running, raise if they are.
        :returns: Dictionary of Amulet sentry instance to dictionary
            of process names to PIDs.
        """
        pid_dict = {}
        for sentry_unit, process_list in six.iteritems(unit_processes):
            pid_dict[sentry_unit] = {}
            for process in process_list:
                pids = self.get_process_id_list(
                    sentry_unit, process, expect_success=expect_success)
                pid_dict[sentry_unit].update({process: pids})
        return pid_dict
host.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def service(action, service_name, **kwargs):
    """Control a system service.

    :param action: the action to take on the service
    :param service_name: the name of the service to perform th action on
    :param **kwargs: additional params to be passed to the service command in
                    the form of key=value.
    """
    if init_is_systemd():
        cmd = ['systemctl', action, service_name]
    else:
        cmd = ['service', service_name, action]
        for key, value in six.iteritems(kwargs):
            parameter = '%s=%s' % (key, value)
            cmd.append(parameter)
    return subprocess.call(cmd) == 0
utils.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _apply_overrides(settings, overrides, schema):
    """Get overrides config overlayed onto modules defaults.

    :param modules: require stack modules config.
    :returns: dictionary of modules config with user overrides applied.
    """
    if overrides:
        for k, v in six.iteritems(overrides):
            if k in schema:
                if schema[k] is None:
                    settings[k] = v
                elif type(schema[k]) is dict:
                    settings[k] = _apply_overrides(settings[k], overrides[k],
                                                   schema[k])
                else:
                    raise Exception("Unexpected type found in schema '%s'" %
                                    type(schema[k]), level=ERROR)
            else:
                log("Unknown override key '%s' - ignoring" % (k), level=INFO)

    return settings
neutron.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def parse_vlan_range_mappings(mappings):
    """Parse vlan range mappings.

    Mappings must be a space-delimited list of provider:start:end mappings.

    The start:end range is optional and may be omitted.

    Returns dict of the form {provider: (start, end)}.
    """
    _mappings = parse_mappings(mappings)
    if not _mappings:
        return {}

    mappings = {}
    for p, r in six.iteritems(_mappings):
        mappings[p] = tuple(r.split(':'))

    return mappings
context.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def context_complete(self, ctxt):
        """Check for missing data for the required context data.
        Set self.missing_data if it exists and return False.
        Set self.complete if no missing data and return True.
        """
        # Fresh start
        self.complete = False
        self.missing_data = []
        for k, v in six.iteritems(ctxt):
            if v is None or v == '':
                if k not in self.missing_data:
                    self.missing_data.append(k)

        if self.missing_data:
            self.complete = False
            log('Missing required data: %s' % ' '.join(self.missing_data),
                level=INFO)
        else:
            self.complete = True
        return self.complete
context.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __call__(self):
        ports = config('data-port')
        if ports:
            # Map of {port/mac:bridge}
            portmap = parse_data_port_mappings(ports)
            ports = portmap.keys()
            # Resolve provided ports or mac addresses and filter out those
            # already attached to a bridge.
            resolved = self.resolve_ports(ports)
            # FIXME: is this necessary?
            normalized = {get_nic_hwaddr(port): port for port in resolved
                          if port not in ports}
            normalized.update({port: port for port in resolved
                               if port in ports})
            if resolved:
                return {normalized[port]: bridge for port, bridge in
                        six.iteritems(portmap) if port in normalized.keys()}

        return None
utils.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_os_version_package(pkg, fatal=True):
    '''Derive OpenStack version number from an installed package.'''
    codename = get_os_codename_package(pkg, fatal=fatal)

    if not codename:
        return None

    if 'swift' in pkg:
        vers_map = SWIFT_CODENAMES
        for cname, version in six.iteritems(vers_map):
            if cname == codename:
                return version[-1]
    else:
        vers_map = OPENSTACK_CODENAMES
        for version, cname in six.iteritems(vers_map):
            if cname == codename:
                return version
    # e = "Could not determine OpenStack version for package: %s" % pkg
    # error_out(e)


# Module local cache variable for the os_release.
utils.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def save_script_rc(script_path="scripts/scriptrc", **env_vars):
    """
    Write an rc file in the charm-delivered directory containing
    exported environment variables provided by env_vars. Any charm scripts run
    outside the juju hook environment can source this scriptrc to obtain
    updated config information necessary to perform health checks or
    service changes.
    """
    juju_rc_path = "%s/%s" % (charm_dir(), script_path)
    if not os.path.exists(os.path.dirname(juju_rc_path)):
        os.mkdir(os.path.dirname(juju_rc_path))
    with open(juju_rc_path, 'wt') as rc_script:
        rc_script.write(
            "#!/bin/bash\n")
        [rc_script.write('export %s=%s\n' % (u, p))
         for u, p in six.iteritems(env_vars) if u != "script_path"]
utils.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def validate_svc_catalog_endpoint_data(self, expected, actual):
        """Validate service catalog endpoint data.

           Validate a list of actual service catalog endpoints vs a list of
           expected service catalog endpoints.
           """
        self.log.debug('Validating service catalog endpoint data...')
        self.log.debug('actual: {}'.format(repr(actual)))
        for k, v in six.iteritems(expected):
            if k in actual:
                ret = self._validate_dict_data(expected[k][0], actual[k][0])
                if ret:
                    return self.endpoint_error(k, ret)
            else:
                return "endpoint {} does not exist".format(k)
        return ret
profile.py 文件源码 项目:seq2seq 作者: google 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def merge_default_with_oplog(graph, op_log=None, run_meta=None):
  """Monkeypatch. There currently is a bug in tfprof_logger that
    prevents it from being used with Python 3. So we override the method
    manually until the fix comes in.
  """
  tmp_op_log = tfprof_log_pb2.OpLog()
  # pylint: disable=W0212
  logged_ops = tfprof_logger._get_logged_ops(graph, run_meta)
  if not op_log:
    tmp_op_log.log_entries.extend(logged_ops.values())
  else:
    all_ops = dict()
    for entry in op_log.log_entries:
      all_ops[entry.name] = entry
    for op_name, entry in six.iteritems(logged_ops):
      if op_name in all_ops:
        all_ops[op_name].types.extend(entry.types)
        if entry.float_ops > 0 and all_ops[op_name].float_ops == 0:
          all_ops[op_name].float_ops = entry.float_ops
      else:
        all_ops[op_name] = entry
    tmp_op_log.log_entries.extend(all_ops.values())
  return tmp_op_log


问题


面经


文章

微信
公众号

扫码关注公众号