def insert_one(self, document: MutableMapping, bypass_document_validation: bool = False,
check_keys: bool = True) -> InsertOneResult:
common.validate_is_document_type('document', document)
if '_id' not in document and not isinstance(document, RawBSONDocument):
document['_id'] = ObjectId()
write_concern = self.write_concern.document
acknowledged = write_concern.get('w') != 0
connection = await self.database.client.get_connection()
if acknowledged:
command = SON([('insert', self.name),
('ordered', True),
('documents', [document])])
if bypass_document_validation and connection.max_wire_version >= 4:
command['bypassDocumentValidation'] = True
result = await connection.command(
self.database.name, command, ReadPreference.PRIMARY, self.__write_response_codec_options,
check_keys=check_keys
)
helpers._check_write_command_response([(0, result)])
else:
if bypass_document_validation and connection.max_wire_version >= 4:
raise OperationFailure('Cannot set bypass_document_validation with',
' unacknowledged write concern')
_, msg, _ = message.insert(
str(self), [document], check_keys,
acknowledged, write_concern, False, self.__write_response_codec_options
)
connection.send_message(msg)
document_id = document['_id'] if not isinstance(document, RawBSONDocument) else None
return InsertOneResult(document_id, acknowledged)
评论列表
文章目录