def _unpack_response(response, cursor_id=None, codec_options=CodecOptions()):
"""Unpack a response from the database.
Check the response for errors and unpack, returning a dictionary
containing the response data.
Can raise CursorNotFound, NotMasterError, ExecutionTimeout, or
OperationFailure.
:Parameters:
- `response`: byte string as returned from the database
- `cursor_id` (optional): cursor_id we sent to get this response -
used for raising an informative exception when we get cursor id not
valid at server response
- `codec_options` (optional): an instance of
:class:`~bson.codec_options.CodecOptions`
"""
response_flag = struct.unpack("<i", response[:4])[0]
if response_flag & 1:
# Shouldn't get this response if we aren't doing a getMore
assert cursor_id is not None
# Fake a getMore command response. OP_GET_MORE provides no document.
msg = "Cursor not found, cursor id: %d" % (cursor_id,)
errobj = {"ok": 0, "errmsg": msg, "code": 43}
raise CursorNotFound(msg, 43, errobj)
elif response_flag & 2:
error_object = bson.BSON(response[20:]).decode()
# Fake the ok field if it doesn't exist.
error_object.setdefault("ok", 0)
if error_object["$err"].startswith("not master"):
raise NotMasterError(error_object["$err"], error_object)
elif error_object.get("code") == 50:
raise ExecutionTimeout(error_object.get("$err"),
error_object.get("code"),
error_object)
raise OperationFailure("database error: %s" %
error_object.get("$err"),
error_object.get("code"),
error_object)
result = {"cursor_id": struct.unpack("<q", response[4:12])[0],
"starting_from": struct.unpack("<i", response[12:16])[0],
"number_returned": struct.unpack("<i", response[16:20])[0],
"data": bson.decode_all(response[20:], codec_options)}
assert len(result["data"]) == result["number_returned"]
return result
python类codec_options()的实例源码
def _first_batch(sock_info, db, coll, query, ntoreturn,
slave_ok, codec_options, read_preference, cmd, listeners):
"""Simple query helper for retrieving a first (and possibly only) batch."""
query = _Query(
0, db, coll, 0, query, None,
codec_options, read_preference, ntoreturn, 0, DEFAULT_READ_CONCERN)
name = next(iter(cmd))
duration = None
publish = listeners.enabled_for_commands
if publish:
start = datetime.datetime.now()
request_id, msg, max_doc_size = query.get_message(slave_ok,
sock_info.is_mongos)
if publish:
encoding_duration = datetime.datetime.now() - start
listeners.publish_command_start(
cmd, db, request_id, sock_info.address)
start = datetime.datetime.now()
sock_info.send_message(msg, max_doc_size)
response = sock_info.receive_message(1, request_id)
try:
result = _unpack_response(response, None, codec_options)
except Exception as exc:
if publish:
duration = (datetime.datetime.now() - start) + encoding_duration
if isinstance(exc, (NotMasterError, OperationFailure)):
failure = exc.details
else:
failure = _convert_exception(exc)
listeners.publish_command_failure(
duration, failure, name, request_id, sock_info.address)
raise
if publish:
duration = (datetime.datetime.now() - start) + encoding_duration
listeners.publish_command_success(
duration, result, name, request_id, sock_info.address)
return result
def _unpack_response(response, cursor_id=None, codec_options=CodecOptions()):
"""Unpack a response from the database.
Check the response for errors and unpack, returning a dictionary
containing the response data.
Can raise CursorNotFound, NotMasterError, ExecutionTimeout, or
OperationFailure.
:Parameters:
- `response`: byte string as returned from the database
- `cursor_id` (optional): cursor_id we sent to get this response -
used for raising an informative exception when we get cursor id not
valid at server response
- `codec_options` (optional): an instance of
:class:`~bson.codec_options.CodecOptions`
"""
response_flag = struct.unpack("<i", response[:4])[0]
if response_flag & 1:
# Shouldn't get this response if we aren't doing a getMore
assert cursor_id is not None
# Fake a getMore command response. OP_GET_MORE provides no document.
msg = "Cursor not found, cursor id: %d" % (cursor_id,)
errobj = {"ok": 0, "errmsg": msg, "code": 43}
raise CursorNotFound(msg, 43, errobj)
elif response_flag & 2:
error_object = bson.BSON(response[20:]).decode()
# Fake the ok field if it doesn't exist.
error_object.setdefault("ok", 0)
if error_object["$err"].startswith("not master"):
raise NotMasterError(error_object["$err"], error_object)
elif error_object.get("code") == 50:
raise ExecutionTimeout(error_object.get("$err"),
error_object.get("code"),
error_object)
raise OperationFailure("database error: %s" %
error_object.get("$err"),
error_object.get("code"),
error_object)
result = {"cursor_id": struct.unpack("<q", response[4:12])[0],
"starting_from": struct.unpack("<i", response[12:16])[0],
"number_returned": struct.unpack("<i", response[16:20])[0],
"data": bson.decode_all(response[20:], codec_options)}
assert len(result["data"]) == result["number_returned"]
return result
def _first_batch(sock_info, db, coll, query, ntoreturn,
slave_ok, codec_options, read_preference, cmd, listeners):
"""Simple query helper for retrieving a first (and possibly only) batch."""
query = _Query(
0, db, coll, 0, ntoreturn, query, None,
codec_options, read_preference, 0, 0, DEFAULT_READ_CONCERN)
name = next(iter(cmd))
duration = None
publish = listeners.enabled_for_commands
if publish:
start = datetime.datetime.now()
request_id, msg, max_doc_size = query.get_message(slave_ok,
sock_info.is_mongos)
if publish:
encoding_duration = datetime.datetime.now() - start
listeners.publish_command_start(
cmd, db, request_id, sock_info.address)
start = datetime.datetime.now()
sock_info.send_message(msg, max_doc_size)
response = sock_info.receive_message(1, request_id)
try:
result = _unpack_response(response, None, codec_options)
except Exception as exc:
if publish:
duration = (datetime.datetime.now() - start) + encoding_duration
if isinstance(exc, (NotMasterError, OperationFailure)):
failure = exc.details
else:
failure = _convert_exception(exc)
listeners.publish_command_failure(
duration, failure, name, request_id, sock_info.address)
raise
if publish:
duration = (datetime.datetime.now() - start) + encoding_duration
listeners.publish_command_success(
duration, result, name, request_id, sock_info.address)
return result
def _unpack_response(response, cursor_id=None, codec_options=CodecOptions()):
"""Unpack a response from the database.
Check the response for errors and unpack, returning a dictionary
containing the response data.
Can raise CursorNotFound, NotMasterError, ExecutionTimeout, or
OperationFailure.
:Parameters:
- `response`: byte string as returned from the database
- `cursor_id` (optional): cursor_id we sent to get this response -
used for raising an informative exception when we get cursor id not
valid at server response
- `codec_options` (optional): an instance of
:class:`~bson.codec_options.CodecOptions`
"""
response_flag = struct.unpack("<i", response[:4])[0]
if response_flag & 1:
# Shouldn't get this response if we aren't doing a getMore
assert cursor_id is not None
# Fake a getMore command response. OP_GET_MORE provides no document.
msg = "Cursor not found, cursor id: %d" % (cursor_id,)
errobj = {"ok": 0, "errmsg": msg, "code": 43}
raise CursorNotFound(msg, 43, errobj)
elif response_flag & 2:
error_object = bson.BSON(response[20:]).decode()
# Fake the ok field if it doesn't exist.
error_object.setdefault("ok", 0)
if error_object["$err"].startswith("not master"):
raise NotMasterError(error_object["$err"], error_object)
elif error_object.get("code") == 50:
raise ExecutionTimeout(error_object.get("$err"),
error_object.get("code"),
error_object)
raise OperationFailure("database error: %s" %
error_object.get("$err"),
error_object.get("code"),
error_object)
result = {"cursor_id": struct.unpack("<q", response[4:12])[0],
"starting_from": struct.unpack("<i", response[12:16])[0],
"number_returned": struct.unpack("<i", response[16:20])[0],
"data": bson.decode_all(response[20:], codec_options)}
assert len(result["data"]) == result["number_returned"]
return result
def _first_batch(sock_info, db, coll, query, ntoreturn,
slave_ok, codec_options, read_preference, cmd, listeners):
"""Simple query helper for retrieving a first (and possibly only) batch."""
query = _Query(
0, db, coll, 0, ntoreturn, query, None,
codec_options, read_preference, 0, 0, DEFAULT_READ_CONCERN)
name = next(iter(cmd))
duration = None
publish = listeners.enabled_for_commands
if publish:
start = datetime.datetime.now()
request_id, msg, max_doc_size = query.get_message(slave_ok,
sock_info.is_mongos)
if publish:
encoding_duration = datetime.datetime.now() - start
listeners.publish_command_start(
cmd, db, request_id, sock_info.address)
start = datetime.datetime.now()
sock_info.send_message(msg, max_doc_size)
response = sock_info.receive_message(1, request_id)
try:
result = _unpack_response(response, None, codec_options)
except Exception as exc:
if publish:
duration = (datetime.datetime.now() - start) + encoding_duration
if isinstance(exc, (NotMasterError, OperationFailure)):
failure = exc.details
else:
failure = _convert_exception(exc)
listeners.publish_command_failure(
duration, failure, name, request_id, sock_info.address)
raise
if publish:
duration = (datetime.datetime.now() - start) + encoding_duration
listeners.publish_command_success(
duration, result, name, request_id, sock_info.address)
return result