def __find_and_modify(self, filter: dict, projection: Optional[Union[list, dict]],
sort: Optional[List[tuple]], upsert: Optional[bool] = None,
return_document: bool = ReturnDocument.BEFORE, **kwargs) -> MutableMapping:
"""Internal findAndModify helper."""
common.validate_is_mapping('filter', filter)
if not isinstance(return_document, bool):
raise ValueError('return_document must be ReturnDocument.BEFORE or ReturnDocument.AFTER')
cmd = SON([('findAndModify', self.name),
('query', filter),
('new', return_document)])
collation = validate_collation_or_none(kwargs.pop('collation', None))
cmd.update(kwargs)
if projection is not None:
cmd['fields'] = helpers._fields_list_to_dict(projection, 'projection')
if sort is not None:
cmd['sort'] = helpers._index_document(sort)
if upsert is not None:
common.validate_boolean('upsert', upsert)
cmd['upsert'] = upsert
connection = await self.database.client.get_connection()
if connection.max_wire_version >= 4 and 'writeConcern' not in cmd:
wc_doc = self.write_concern.document
if wc_doc:
cmd['writeConcern'] = wc_doc
out = await connection.command(
self.database.name, cmd, ReadPreference.PRIMARY, self.codec_options,
allowable_errors=[_NO_OBJ_ERROR], collation=collation
)
helpers._check_write_command_response([(0, out)])
return out.get('value')
python类MutableMapping()的实例源码
def __to_json__(self) -> t.Mapping[t.Any, t.Any]:
"""Creates a JSON serializable representation of this object.
:returns: This APIException instance as a dictionary.
"""
ret = dict(self.rest) # type: t.MutableMapping[t.Any, t.Any]
ret['message'] = self.message
ret['description'] = self.description
ret['code'] = self.api_code.name
return ret
def extract(
file: FileStorage,
ignore_filter: IgnoreFilterManager = None,
handle_ignore: IgnoreHandling = IgnoreHandling.keep
) -> t.Optional[ExtractFileTree]:
"""Extracts all files in archive with random name to uploads folder.
:param werkzeug.datastructures.FileStorage file: The file to extract.
:param ignore_filter: What files should be ignored in the given archive.
This can only be None when ``handle_ignore`` is
``IgnoreHandling.keep``.
:param handle_ignore: How should ignored file be handled.
:returns: A file tree as generated by
:py:func:`rename_directory_structure`.
"""
if handle_ignore == IgnoreHandling.keep and ignore_filter is None:
ignore_filter = IgnoreFilterManager([])
elif ignore_filter is None: # pragma: no cover
raise ValueError
tmpdir = extract_to_temp(
file,
ignore_filter,
handle_ignore,
)
rootdir = tmpdir.rstrip(os.sep)
start = rootdir.rfind(os.sep) + 1
try:
res = rename_directory_structure(tmpdir)[tmpdir[start:]]
filename: str = file.filename.split('.')[0]
if not res:
return None
elif len(res) > 1:
return {filename: res if isinstance(res, list) else [res]}
elif not isinstance(res[0], t.MutableMapping):
return {filename: res}
else:
return res[0]
finally:
shutil.rmtree(tmpdir)
def __init__(self, name: str) -> None:
self.conf: t.MutableMapping[t.Any, t.Any] = {}
def is_valid_request(
self,
request: t.Any,
parameters: t.MutableMapping[str, str] = {},
fake_method: t.Any = None,
handle_error: bool = True
) -> bool:
'''
Validates an OAuth request using the python-oauth2 library:
https://github.com/simplegeo/python-oauth2
'''
def handle(e: oauth2.Error) -> bool:
if handle_error:
return False
else:
raise e
try:
method, url, headers, parameters = self.parse_request(
request, parameters, fake_method
)
oauth_request = oauth2.Request.from_request(
method, url, headers=headers, parameters=parameters
)
oauth2.Token
self.oauth_server.verify_request(
oauth_request, self.oauth_consumer, {}
)
except oauth2.Error as e:
return handle(e)
except ValueError as e:
return handle(e)
# Signature was valid
return True
def parse_request(
self,
req: 'flask.Request',
parameters: t.MutableMapping[str, str] = None,
fake_method: t.Any = None
) -> t.Tuple[str, str, t.MutableMapping[str, str], t.MutableMapping[str,
str]]:
'''
Parse Flask request
'''
return (req.method, req.url, dict(req.headers), req.form.copy())
def __to_json__(self) -> t.MutableMapping[str, t.Any]:
"""Creates a JSON serializable representation of this object.
"""
return {
'name': self.name,
'course': self.course,
'id': self.id,
}
def set_bool(
out: t.MutableMapping[str, t.Any], parser: t.Any, item: str, default: bool
) -> None:
val = parser.getboolean(item)
out[item] = bool(default if val is None else val)
def set_float(
out: t.MutableMapping[str, t.Any],
parser: t.Any,
item: str,
default: float
) -> None:
val = parser.getfloat(item)
out[item] = float(default if val is None else val)
def set_int(
out: t.MutableMapping[str, t.Any], parser: t.Any, item: str, default: int
) -> None:
val = parser.getint(item)
out[item] = int(default if val is None else val)
def set_str(
out: t.MutableMapping[str, t.Any], parser: t.Any, item: str, default: str
) -> None:
val = parser.get(item)
out[item] = str(default if val is None else val)
def __init__(self, uuid: str, value: Optional[Sequence[int]],
flags: Sequence[str]):
self.uuid = uuid
self.value = value
self.flags = flags
self.descriptors: MutableMapping[str, GATTDescriptor] = dict()
def __init__(self, uuid: str, primary: bool):
self.uuid = uuid
self.primary = primary
self.characteristics: MutableMapping[str, GATTCharacteristic] = dict()
def __init__(self,
path: str, address: str,
paired: bool, connected: bool, services_resolved: bool,
name: Optional[str] = None, device_class: Optional[int] = None,
appearance: Optional[int] = None, uuids: Sequence[str] = None,
rssi: int = None, tx_power: int = None,
manufacturer_data: Dict[int, Sequence[int]] = None,
service_data: Dict[str, Sequence[int]] = None) -> None:
self.active = True
self.path = path
self.address = address
self.paired = paired
self.connected = connected
self.services_resolved = services_resolved
self.name = name
self.device_class = device_class
self.appearance = appearance
self.uuids = set(uuids) if uuids is not None else set()
self.rssis = [rssi] if rssi is not None else list()
self.tx_power = tx_power
self.first_seen = datetime.datetime.now()
self.last_seen = datetime.datetime.now()
self.services: MutableMapping[str, GATTService] = dict()
self.manufacturer_data = dict()
if manufacturer_data is not None:
for k, v in manufacturer_data.items():
self.manufacturer_data[k] = [v]
self.service_data = dict()
if service_data is not None:
self.uuids = self.uuids.union(service_data.keys())
for k, v in service_data.items():
self.service_data[k] = [v]
def list_scale_sets(self, resource_group_name: str) -> List[AzureScaleSet]:
fifteen_minutes_ago = datetime.now(pytz.utc) - TIMEOUT_PERIOD
filter_clause = "eventTimestamp ge '{}' and resourceGroupName eq '{}'".format(fifteen_minutes_ago, resource_group_name)
select_clause = "authorization,status,subStatus,properties,resourceId,eventTimestamp"
failures_by_scale_set: MutableMapping[str, List[EventData]] = {}
for log in self._monitor_client.activity_logs.list(filter=filter_clause, select=select_clause):
if (log.status and log.status.value == 'Failed') or (log.properties and log.properties.get('statusCode') == 'Conflict'):
if log.authorization and log.authorization.action and 'delete' in log.authorization.action:
continue
failures_by_scale_set.setdefault(log.resource_id, []).append(log)
result = []
for scale_set in self._compute_client.virtual_machine_scale_sets.list(resource_group_name):
failures = sorted(failures_by_scale_set.get(scale_set.id, []), key=lambda x: x.event_timestamp, reverse=True)
timeout_until = None
timeout_reason = None
for failure in failures:
status_message = json.loads(failure.properties.get('statusMessage', "{}")) if failure.properties else {}
error_details = status_message.get('error', {})
if 'message' in error_details:
timeout_until = failure.event_timestamp + TIMEOUT_PERIOD
timeout_reason = error_details['message']
# Stop if we found a message with details
break
if timeout_until is None:
timeout_until = failure.event_timestamp + TIMEOUT_PERIOD
timeout_reason = failure.sub_status.localized_value
priority = int(scale_set.tags[PRIORITY_TAG]) if PRIORITY_TAG in scale_set.tags else None
no_schedule_taints = json.loads(scale_set.tags.get(NO_SCHEDULE_TAINTS_TAG, '{}'))
result.append(AzureScaleSet(scale_set.location, resource_group_name, scale_set.name, scale_set.sku.name,
scale_set.sku.capacity, scale_set.provisioning_state, timeout_until=timeout_until,
timeout_reason=timeout_reason, priority=priority, no_schedule_taints=no_schedule_taints))
return result
def __init__(self, delegate: AzureApi) -> None:
self._delegate = delegate
self._lock = RLock()
self._instance_cache: MutableMapping[Tuple[str, str], List[AzureScaleSetInstance]] = {}
self._scale_set_cache: MutableMapping[str, List[AzureScaleSet]] = {}
self._remaining_instances_cache: MutableMapping[str, MutableMapping[str, int]] = {}
def __init__(self, guild: 'Guild',
channels: 'typing.MutableMapping[int, channel.Channel]'):
"""
:param guild: The :class:`~.Guild` object that owns this wrapper.
:param channels: The dictionary of channels that this wrapper contains.
"""
self._guild = guild
self._channels = channels
def __init__(self, guild: 'Guild',
roles: 'typing.MutableMapping[int, role.Role]'):
"""
:param guild: The :class:`~.Guild` object that owns this wrapper.
:param roles: The dictionary of roles that this wrapper contains.
"""
self._guild = guild
self._roles = roles
def __init__(self, guild: 'Guild',
emojis: 'typing.MutableMapping[int, dt_emoji.Emoji]'):
"""
:param guild: The :class:`.Guild` object that owns this wrapper.
:param emojis: The dictionary of emojis that this wrapper contains.
"""
self._guild = guild
self._emojis = emojis
def remove_versions(self, versions: typing.MutableMapping[str, str]):
"""
Remove this document from each given index provided that it contains the given version of this document.
"""
es_client = ElasticsearchClient.get(self.logger)
num_ok, errors = bulk(es_client, raise_on_error=False, actions=[{
'_op_type': 'delete',
'_index': index_name,
'_type': ESDocType.doc.name,
'_version': version,
'_id': str(self.fqid),
} for index_name, version in versions.items()])
for item in errors:
self.logger.warning(f"Document deletion failed: {json.dumps(item)}")