python类MutableMapping()的实例源码

collection.py 文件源码 项目:aiomongo 作者: ZeoAlliance 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __find_and_modify(self, filter: dict, projection: Optional[Union[list, dict]],
                                sort: Optional[List[tuple]], upsert: Optional[bool] = None,
                                return_document: bool = ReturnDocument.BEFORE, **kwargs) -> MutableMapping:
        """Internal findAndModify helper."""
        common.validate_is_mapping('filter', filter)
        if not isinstance(return_document, bool):
            raise ValueError('return_document must be ReturnDocument.BEFORE or ReturnDocument.AFTER')
        cmd = SON([('findAndModify', self.name),
                   ('query', filter),
                   ('new', return_document)])
        collation = validate_collation_or_none(kwargs.pop('collation', None))
        cmd.update(kwargs)
        if projection is not None:
            cmd['fields'] = helpers._fields_list_to_dict(projection, 'projection')
        if sort is not None:
            cmd['sort'] = helpers._index_document(sort)
        if upsert is not None:
            common.validate_boolean('upsert', upsert)
            cmd['upsert'] = upsert

        connection = await self.database.client.get_connection()
        if connection.max_wire_version >= 4 and 'writeConcern' not in cmd:
            wc_doc = self.write_concern.document
            if wc_doc:
                cmd['writeConcern'] = wc_doc

        out = await connection.command(
            self.database.name, cmd, ReadPreference.PRIMARY, self.codec_options,
            allowable_errors=[_NO_OBJ_ERROR], collation=collation
        )
        helpers._check_write_command_response([(0, out)])
        return out.get('value')
errors.py 文件源码 项目:CodeGra.de 作者: CodeGra-de 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def __to_json__(self) -> t.Mapping[t.Any, t.Any]:
        """Creates a JSON serializable representation of this object.

        :returns: This APIException instance as a dictionary.
        """
        ret = dict(self.rest)  # type: t.MutableMapping[t.Any, t.Any]
        ret['message'] = self.message
        ret['description'] = self.description
        ret['code'] = self.api_code.name
        return ret
files.py 文件源码 项目:CodeGra.de 作者: CodeGra-de 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def extract(
    file: FileStorage,
    ignore_filter: IgnoreFilterManager = None,
    handle_ignore: IgnoreHandling = IgnoreHandling.keep
) -> t.Optional[ExtractFileTree]:
    """Extracts all files in archive with random name to uploads folder.

    :param werkzeug.datastructures.FileStorage file: The file to extract.
    :param ignore_filter: What files should be ignored in the given archive.
        This can only be None when ``handle_ignore`` is
        ``IgnoreHandling.keep``.
    :param handle_ignore: How should ignored file be handled.
    :returns: A file tree as generated by
        :py:func:`rename_directory_structure`.
    """
    if handle_ignore == IgnoreHandling.keep and ignore_filter is None:
        ignore_filter = IgnoreFilterManager([])
    elif ignore_filter is None:  # pragma: no cover
        raise ValueError

    tmpdir = extract_to_temp(
        file,
        ignore_filter,
        handle_ignore,
    )
    rootdir = tmpdir.rstrip(os.sep)
    start = rootdir.rfind(os.sep) + 1
    try:
        res = rename_directory_structure(tmpdir)[tmpdir[start:]]
        filename: str = file.filename.split('.')[0]
        if not res:
            return None
        elif len(res) > 1:
            return {filename: res if isinstance(res, list) else [res]}
        elif not isinstance(res[0], t.MutableMapping):
            return {filename: res}
        else:
            return res[0]
    finally:
        shutil.rmtree(tmpdir)
tasks.py 文件源码 项目:CodeGra.de 作者: CodeGra-de 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __init__(self, name: str) -> None:
            self.conf: t.MutableMapping[t.Any, t.Any] = {}
auth.py 文件源码 项目:CodeGra.de 作者: CodeGra-de 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def is_valid_request(
        self,
        request: t.Any,
        parameters: t.MutableMapping[str, str] = {},
        fake_method: t.Any = None,
        handle_error: bool = True
    ) -> bool:
        '''
        Validates an OAuth request using the python-oauth2 library:
            https://github.com/simplegeo/python-oauth2
        '''

        def handle(e: oauth2.Error) -> bool:
            if handle_error:
                return False
            else:
                raise e

        try:
            method, url, headers, parameters = self.parse_request(
                request, parameters, fake_method
            )

            oauth_request = oauth2.Request.from_request(
                method, url, headers=headers, parameters=parameters
            )

            oauth2.Token
            self.oauth_server.verify_request(
                oauth_request, self.oauth_consumer, {}
            )

        except oauth2.Error as e:
            return handle(e)
        except ValueError as e:
            return handle(e)
        # Signature was valid
        return True
auth.py 文件源码 项目:CodeGra.de 作者: CodeGra-de 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def parse_request(
        self,
        req: 'flask.Request',
        parameters: t.MutableMapping[str, str] = None,
        fake_method: t.Any = None
    ) -> t.Tuple[str, str, t.MutableMapping[str, str], t.MutableMapping[str,
                                                                        str]]:
        '''
        Parse Flask request
        '''
        return (req.method, req.url, dict(req.headers), req.form.copy())
models.py 文件源码 项目:CodeGra.de 作者: CodeGra-de 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __to_json__(self) -> t.MutableMapping[str, t.Any]:
        """Creates a JSON serializable representation of this object.
        """
        return {
            'name': self.name,
            'course': self.course,
            'id': self.id,
        }
config.py 文件源码 项目:CodeGra.de 作者: CodeGra-de 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def set_bool(
    out: t.MutableMapping[str, t.Any], parser: t.Any, item: str, default: bool
) -> None:
    val = parser.getboolean(item)
    out[item] = bool(default if val is None else val)
config.py 文件源码 项目:CodeGra.de 作者: CodeGra-de 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def set_float(
    out: t.MutableMapping[str, t.Any],
    parser: t.Any,
    item: str,
    default: float
) -> None:
    val = parser.getfloat(item)
    out[item] = float(default if val is None else val)
config.py 文件源码 项目:CodeGra.de 作者: CodeGra-de 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def set_int(
    out: t.MutableMapping[str, t.Any], parser: t.Any, item: str, default: int
) -> None:
    val = parser.getint(item)
    out[item] = int(default if val is None else val)
config.py 文件源码 项目:CodeGra.de 作者: CodeGra-de 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def set_str(
    out: t.MutableMapping[str, t.Any], parser: t.Any, item: str, default: str
) -> None:
    val = parser.get(item)
    out[item] = str(default if val is None else val)
device.py 文件源码 项目:btle-sniffer 作者: scipag 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, uuid: str, value: Optional[Sequence[int]],
                 flags: Sequence[str]):
        self.uuid = uuid
        self.value = value
        self.flags = flags
        self.descriptors: MutableMapping[str, GATTDescriptor] = dict()
device.py 文件源码 项目:btle-sniffer 作者: scipag 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, uuid: str, primary: bool):
        self.uuid = uuid
        self.primary = primary
        self.characteristics: MutableMapping[str, GATTCharacteristic] = dict()
device.py 文件源码 项目:btle-sniffer 作者: scipag 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self,
                 path: str, address: str,
                 paired: bool, connected: bool, services_resolved: bool,
                 name: Optional[str] = None, device_class: Optional[int] = None,
                 appearance: Optional[int] = None, uuids: Sequence[str] = None,
                 rssi: int = None, tx_power: int = None,
                 manufacturer_data: Dict[int, Sequence[int]] = None,
                 service_data: Dict[str, Sequence[int]] = None) -> None:
        self.active = True
        self.path = path
        self.address = address
        self.paired = paired
        self.connected = connected
        self.services_resolved = services_resolved
        self.name = name
        self.device_class = device_class
        self.appearance = appearance
        self.uuids = set(uuids) if uuids is not None else set()
        self.rssis = [rssi] if rssi is not None else list()
        self.tx_power = tx_power
        self.first_seen = datetime.datetime.now()
        self.last_seen = datetime.datetime.now()
        self.services: MutableMapping[str, GATTService] = dict()

        self.manufacturer_data = dict()
        if manufacturer_data is not None:
            for k, v in manufacturer_data.items():
                self.manufacturer_data[k] = [v]

        self.service_data = dict()
        if service_data is not None:
            self.uuids = self.uuids.union(service_data.keys())
            for k, v in service_data.items():
                self.service_data[k] = [v]
azure_api.py 文件源码 项目:kubernetes-ec2-autoscaler 作者: openai 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def list_scale_sets(self, resource_group_name: str) -> List[AzureScaleSet]:
        fifteen_minutes_ago = datetime.now(pytz.utc) - TIMEOUT_PERIOD
        filter_clause = "eventTimestamp ge '{}' and resourceGroupName eq '{}'".format(fifteen_minutes_ago, resource_group_name)
        select_clause = "authorization,status,subStatus,properties,resourceId,eventTimestamp"

        failures_by_scale_set: MutableMapping[str, List[EventData]] = {}
        for log in self._monitor_client.activity_logs.list(filter=filter_clause, select=select_clause):
            if (log.status and log.status.value == 'Failed') or (log.properties and log.properties.get('statusCode') == 'Conflict'):
                if log.authorization and log.authorization.action and 'delete' in log.authorization.action:
                    continue
                failures_by_scale_set.setdefault(log.resource_id, []).append(log)

        result = []
        for scale_set in self._compute_client.virtual_machine_scale_sets.list(resource_group_name):
            failures = sorted(failures_by_scale_set.get(scale_set.id, []), key=lambda x: x.event_timestamp, reverse=True)
            timeout_until = None
            timeout_reason = None
            for failure in failures:
                status_message = json.loads(failure.properties.get('statusMessage', "{}")) if failure.properties else {}
                error_details = status_message.get('error', {})
                if 'message' in error_details:
                    timeout_until = failure.event_timestamp + TIMEOUT_PERIOD
                    timeout_reason = error_details['message']
                    # Stop if we found a message with details
                    break
                if timeout_until is None:
                    timeout_until = failure.event_timestamp + TIMEOUT_PERIOD
                    timeout_reason = failure.sub_status.localized_value

            priority = int(scale_set.tags[PRIORITY_TAG]) if PRIORITY_TAG in scale_set.tags else None
            no_schedule_taints = json.loads(scale_set.tags.get(NO_SCHEDULE_TAINTS_TAG, '{}'))

            result.append(AzureScaleSet(scale_set.location, resource_group_name, scale_set.name, scale_set.sku.name,
                                        scale_set.sku.capacity, scale_set.provisioning_state, timeout_until=timeout_until,
                                        timeout_reason=timeout_reason, priority=priority, no_schedule_taints=no_schedule_taints))
        return result
azure_api.py 文件源码 项目:kubernetes-ec2-autoscaler 作者: openai 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, delegate: AzureApi) -> None:
        self._delegate = delegate
        self._lock = RLock()
        self._instance_cache: MutableMapping[Tuple[str, str], List[AzureScaleSetInstance]] = {}
        self._scale_set_cache: MutableMapping[str, List[AzureScaleSet]] = {}
        self._remaining_instances_cache: MutableMapping[str, MutableMapping[str, int]] = {}
guild.py 文件源码 项目:curious 作者: SunDwarf 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, guild: 'Guild',
                 channels: 'typing.MutableMapping[int, channel.Channel]'):
        """
        :param guild: The :class:`~.Guild` object that owns this wrapper.
        :param channels: The dictionary of channels that this wrapper contains.
        """
        self._guild = guild
        self._channels = channels
guild.py 文件源码 项目:curious 作者: SunDwarf 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, guild: 'Guild',
                 roles: 'typing.MutableMapping[int, role.Role]'):
        """
        :param guild: The :class:`~.Guild` object that owns this wrapper.
        :param roles: The dictionary of roles that this wrapper contains.
        """
        self._guild = guild
        self._roles = roles
guild.py 文件源码 项目:curious 作者: SunDwarf 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, guild: 'Guild',
                 emojis: 'typing.MutableMapping[int, dt_emoji.Emoji]'):
        """
        :param guild: The :class:`.Guild` object that owns this wrapper.
        :param emojis: The dictionary of emojis that this wrapper contains.
        """
        self._guild = guild
        self._emojis = emojis
index_document.py 文件源码 项目:data-store 作者: HumanCellAtlas 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def remove_versions(self, versions: typing.MutableMapping[str, str]):
        """
        Remove this document from each given index provided that it contains the given version of this document.
        """
        es_client = ElasticsearchClient.get(self.logger)
        num_ok, errors = bulk(es_client, raise_on_error=False, actions=[{
            '_op_type': 'delete',
            '_index': index_name,
            '_type': ESDocType.doc.name,
            '_version': version,
            '_id': str(self.fqid),
        } for index_name, version in versions.items()])
        for item in errors:
            self.logger.warning(f"Document deletion failed: {json.dumps(item)}")


问题


面经


文章

微信
公众号

扫码关注公众号