def _command(self, connection: 'aiomongo.Connection', command: Union[str, dict], value: Any=1,
check: bool = True, allowable_errors: Optional[List[str]] = None,
read_preference: Union[_ALL_READ_PREFERENCES] = ReadPreference.PRIMARY,
codec_options: CodecOptions = DEFAULT_CODEC_OPTIONS, **kwargs) -> MutableMapping:
"""Internal command helper."""
if isinstance(command, str):
command = SON([(command, value)])
command.update(kwargs)
return await connection.command(
self.name, command, read_preference, codec_options, check=check,
allowable_errors=allowable_errors
)
评论列表
文章目录