def __init__(self, flags, db, coll, ntoskip, spec, fields,
codec_options, read_preference, limit,
batch_size, read_concern, collation):
self.flags = flags
self.db = db
self.coll = coll
self.ntoskip = ntoskip
self.spec = spec
self.fields = fields
self.codec_options = codec_options
self.read_preference = read_preference
self.read_concern = read_concern
self.limit = limit
self.batch_size = batch_size
self.collation = collation
self.name = 'find'
python类codec_options()的实例源码
def __init__(self, flags, db, coll, ntoskip, ntoreturn, spec, fields,
codec_options, read_preference, limit,
batch_size, read_concern):
self.flags = flags
self.db = db
self.coll = coll
self.ntoskip = ntoskip
self.ntoreturn = ntoreturn
self.spec = spec
self.fields = fields
self.codec_options = codec_options
self.read_preference = read_preference
self.read_concern = read_concern
self.limit = limit
self.batch_size = batch_size
self.name = 'find'
def get_message(self, set_slave_ok, is_mongos, use_cmd=False):
"""Get a query message, possibly setting the slaveOk bit."""
if set_slave_ok:
# Set the slaveOk bit.
flags = self.flags | 4
else:
flags = self.flags
ns = _UJOIN % (self.db, self.coll)
spec = self.spec
ntoreturn = self.ntoreturn
if use_cmd:
ns = _UJOIN % (self.db, "$cmd")
spec = self.as_command()[0]
ntoreturn = -1 # All DB commands return 1 document
if is_mongos:
spec = _maybe_add_read_preference(spec,
self.read_preference)
return query(flags, ns, self.ntoskip, ntoreturn,
spec, self.fields, self.codec_options)
def __init__(self, flags, db, coll, ntoskip, ntoreturn, spec, fields,
codec_options, read_preference, limit,
batch_size, read_concern):
self.flags = flags
self.db = db
self.coll = coll
self.ntoskip = ntoskip
self.ntoreturn = ntoreturn
self.spec = spec
self.fields = fields
self.codec_options = codec_options
self.read_preference = read_preference
self.read_concern = read_concern
self.limit = limit
self.batch_size = batch_size
self.name = 'find'
def get_message(self, set_slave_ok, is_mongos, use_cmd=False):
"""Get a query message, possibly setting the slaveOk bit."""
if set_slave_ok:
# Set the slaveOk bit.
flags = self.flags | 4
else:
flags = self.flags
ns = _UJOIN % (self.db, self.coll)
spec = self.spec
ntoreturn = self.ntoreturn
if use_cmd:
ns = _UJOIN % (self.db, "$cmd")
spec = self.as_command()[0]
ntoreturn = -1 # All DB commands return 1 document
if is_mongos:
spec = _maybe_add_read_preference(spec,
self.read_preference)
return query(flags, ns, self.ntoskip, ntoreturn,
spec, self.fields, self.codec_options)
def __init__(self, flags, db, coll, ntoskip, spec, fields,
codec_options, read_preference, limit,
batch_size, read_concern):
self.flags = flags
self.db = db
self.coll = coll
self.ntoskip = ntoskip
self.spec = spec
self.fields = fields
self.codec_options = codec_options
self.read_preference = read_preference
self.read_concern = read_concern
self.limit = limit
self.batch_size = batch_size
self.name = 'find'
def get_message(self, set_slave_ok, is_mongos, use_cmd=False):
"""Get a query message, possibly setting the slaveOk bit."""
if set_slave_ok:
# Set the slaveOk bit.
flags = self.flags | 4
else:
flags = self.flags
ns = _UJOIN % (self.db, self.coll)
spec = self.spec
if use_cmd:
ns = _UJOIN % (self.db, "$cmd")
spec = self.as_command()[0]
ntoreturn = -1 # All DB commands return 1 document
else:
# OP_QUERY treats ntoreturn of -1 and 1 the same, return
# one document and close the cursor. We have to use 2 for
# batch size if 1 is specified.
ntoreturn = self.batch_size == 1 and 2 or self.batch_size
if self.limit:
if ntoreturn:
ntoreturn = min(self.limit, ntoreturn)
else:
ntoreturn = self.limit
if is_mongos:
spec = _maybe_add_read_preference(spec,
self.read_preference)
return query(flags, ns, self.ntoskip, ntoreturn,
spec, self.fields, self.codec_options)
def __init__(self, db, coll, ntoreturn, cursor_id, codec_options,
max_await_time_ms=None):
self.db = db
self.coll = coll
self.ntoreturn = ntoreturn
self.cursor_id = cursor_id
self.codec_options = codec_options
self.max_await_time_ms = max_await_time_ms
def __init__(self, flags, db, coll, ntoskip, spec, fields,
codec_options, read_preference, limit,
batch_size, read_concern):
self.flags = flags
self.db = db
self.coll = coll
self.ntoskip = ntoskip
self.spec = spec
self.fields = fields
self.codec_options = codec_options
self.read_preference = read_preference
self.read_concern = read_concern
self.limit = limit
self.batch_size = batch_size
self.name = 'find'
def get_message(self, set_slave_ok, is_mongos, use_cmd=False):
"""Get a query message, possibly setting the slaveOk bit."""
if set_slave_ok:
# Set the slaveOk bit.
flags = self.flags | 4
else:
flags = self.flags
ns = _UJOIN % (self.db, self.coll)
spec = self.spec
if use_cmd:
ns = _UJOIN % (self.db, "$cmd")
spec = self.as_command()[0]
ntoreturn = -1 # All DB commands return 1 document
else:
# OP_QUERY treats ntoreturn of -1 and 1 the same, return
# one document and close the cursor. We have to use 2 for
# batch size if 1 is specified.
ntoreturn = self.batch_size == 1 and 2 or self.batch_size
if self.limit:
if ntoreturn:
ntoreturn = min(self.limit, ntoreturn)
else:
ntoreturn = self.limit
if is_mongos:
spec = _maybe_add_read_preference(spec,
self.read_preference)
return query(flags, ns, self.ntoskip, ntoreturn,
spec, self.fields, self.codec_options)
def __init__(self, db, coll, ntoreturn, cursor_id, codec_options,
max_await_time_ms=None):
self.db = db
self.coll = coll
self.ntoreturn = ntoreturn
self.cursor_id = cursor_id
self.codec_options = codec_options
self.max_await_time_ms = max_await_time_ms
def __init__(self, flags, db, coll, ntoskip, spec, fields,
codec_options, read_preference, limit,
batch_size, read_concern):
self.flags = flags
self.db = db
self.coll = coll
self.ntoskip = ntoskip
self.spec = spec
self.fields = fields
self.codec_options = codec_options
self.read_preference = read_preference
self.read_concern = read_concern
self.limit = limit
self.batch_size = batch_size
self.name = 'find'
def get_message(self, set_slave_ok, is_mongos, use_cmd=False):
"""Get a query message, possibly setting the slaveOk bit."""
if set_slave_ok:
# Set the slaveOk bit.
flags = self.flags | 4
else:
flags = self.flags
ns = _UJOIN % (self.db, self.coll)
spec = self.spec
if use_cmd:
ns = _UJOIN % (self.db, "$cmd")
spec = self.as_command()[0]
ntoreturn = -1 # All DB commands return 1 document
else:
# OP_QUERY treats ntoreturn of -1 and 1 the same, return
# one document and close the cursor. We have to use 2 for
# batch size if 1 is specified.
ntoreturn = self.batch_size == 1 and 2 or self.batch_size
if self.limit:
if ntoreturn:
ntoreturn = min(self.limit, ntoreturn)
else:
ntoreturn = self.limit
if is_mongos:
spec = _maybe_add_read_preference(spec,
self.read_preference)
return query(flags, ns, self.ntoskip, ntoreturn,
spec, self.fields, self.codec_options)
def __init__(self, db, coll, ntoreturn, cursor_id, codec_options,
max_await_time_ms=None):
self.db = db
self.coll = coll
self.ntoreturn = ntoreturn
self.cursor_id = cursor_id
self.codec_options = codec_options
self.max_await_time_ms = max_await_time_ms
def get_message(self, set_slave_ok, is_mongos, use_cmd=False):
"""Get a query message, possibly setting the slaveOk bit."""
if set_slave_ok:
# Set the slaveOk bit.
flags = self.flags | 4
else:
flags = self.flags
ns = _UJOIN % (self.db, self.coll)
spec = self.spec
if use_cmd:
ns = _UJOIN % (self.db, "$cmd")
spec = self.as_command()[0]
ntoreturn = -1 # All DB commands return 1 document
else:
# OP_QUERY treats ntoreturn of -1 and 1 the same, return
# one document and close the cursor. We have to use 2 for
# batch size if 1 is specified.
ntoreturn = self.batch_size == 1 and 2 or self.batch_size
if self.limit:
if ntoreturn:
ntoreturn = min(self.limit, ntoreturn)
else:
ntoreturn = self.limit
if is_mongos:
spec = _maybe_add_read_preference(spec,
self.read_preference)
return query(flags, ns, self.ntoskip, ntoreturn,
spec, self.fields, self.codec_options)
def __init__(self, db, coll, ntoreturn, cursor_id, codec_options,
max_await_time_ms=None):
self.db = db
self.coll = coll
self.ntoreturn = ntoreturn
self.cursor_id = cursor_id
self.codec_options = codec_options
self.max_await_time_ms = max_await_time_ms
def __init__(self, flags, db, coll, ntoskip, spec, fields,
codec_options, read_preference, limit,
batch_size, read_concern):
self.flags = flags
self.db = db
self.coll = coll
self.ntoskip = ntoskip
self.spec = spec
self.fields = fields
self.codec_options = codec_options
self.read_preference = read_preference
self.read_concern = read_concern
self.limit = limit
self.batch_size = batch_size
self.name = 'find'
def get_message(self, set_slave_ok, is_mongos, use_cmd=False):
"""Get a query message, possibly setting the slaveOk bit."""
if set_slave_ok:
# Set the slaveOk bit.
flags = self.flags | 4
else:
flags = self.flags
ns = _UJOIN % (self.db, self.coll)
spec = self.spec
if use_cmd:
ns = _UJOIN % (self.db, "$cmd")
spec = self.as_command()[0]
ntoreturn = -1 # All DB commands return 1 document
else:
# OP_QUERY treats ntoreturn of -1 and 1 the same, return
# one document and close the cursor. We have to use 2 for
# batch size if 1 is specified.
ntoreturn = self.batch_size == 1 and 2 or self.batch_size
if self.limit:
if ntoreturn:
ntoreturn = min(self.limit, ntoreturn)
else:
ntoreturn = self.limit
if is_mongos:
spec = _maybe_add_read_preference(spec,
self.read_preference)
return query(flags, ns, self.ntoskip, ntoreturn,
spec, self.fields, self.codec_options)
def __init__(self, db, coll, ntoreturn, cursor_id, codec_options,
max_await_time_ms=None):
self.db = db
self.coll = coll
self.ntoreturn = ntoreturn
self.cursor_id = cursor_id
self.codec_options = codec_options
self.max_await_time_ms = max_await_time_ms
def __init__(self, db, coll, ntoreturn, cursor_id, codec_options,
max_await_time_ms=None):
self.db = db
self.coll = coll
self.ntoreturn = ntoreturn
self.cursor_id = cursor_id
self.codec_options = codec_options
self.max_await_time_ms = max_await_time_ms
def __init__(self, db, coll, ntoreturn, cursor_id, codec_options,
max_await_time_ms=None):
self.db = db
self.coll = coll
self.ntoreturn = ntoreturn
self.cursor_id = cursor_id
self.codec_options = codec_options
self.max_await_time_ms = max_await_time_ms
def _unpack_response(response, cursor_id=None, codec_options=CodecOptions()):
"""Unpack a response from the database.
Check the response for errors and unpack, returning a dictionary
containing the response data.
Can raise CursorNotFound, NotMasterError, ExecutionTimeout, or
OperationFailure.
:Parameters:
- `response`: byte string as returned from the database
- `cursor_id` (optional): cursor_id we sent to get this response -
used for raising an informative exception when we get cursor id not
valid at server response
- `codec_options` (optional): an instance of
:class:`~bson.codec_options.CodecOptions`
"""
response_flag = struct.unpack("<i", response[:4])[0]
if response_flag & 1:
# Shouldn't get this response if we aren't doing a getMore
assert cursor_id is not None
# Fake a getMore command response. OP_GET_MORE provides no document.
msg = "Cursor not found, cursor id: %d" % (cursor_id,)
errobj = {"ok": 0, "errmsg": msg, "code": 43}
raise CursorNotFound(msg, 43, errobj)
elif response_flag & 2:
error_object = bson.BSON(response[20:]).decode()
# Fake the ok field if it doesn't exist.
error_object.setdefault("ok", 0)
if error_object["$err"].startswith("not master"):
raise NotMasterError(error_object["$err"], error_object)
elif error_object.get("code") == 50:
raise ExecutionTimeout(error_object.get("$err"),
error_object.get("code"),
error_object)
raise OperationFailure("database error: %s" %
error_object.get("$err"),
error_object.get("code"),
error_object)
result = {"cursor_id": struct.unpack("<q", response[4:12])[0],
"starting_from": struct.unpack("<i", response[12:16])[0],
"number_returned": struct.unpack("<i", response[16:20])[0],
"data": bson.decode_all(response[20:], codec_options)}
assert len(result["data"]) == result["number_returned"]
return result
def _first_batch(sock_info, db, coll, query, ntoreturn,
slave_ok, codec_options, read_preference, cmd, listeners):
"""Simple query helper for retrieving a first (and possibly only) batch."""
query = _Query(
0, db, coll, 0, query, None,
codec_options, read_preference, ntoreturn, 0, DEFAULT_READ_CONCERN)
name = next(iter(cmd))
duration = None
publish = listeners.enabled_for_commands
if publish:
start = datetime.datetime.now()
request_id, msg, max_doc_size = query.get_message(slave_ok,
sock_info.is_mongos)
if publish:
encoding_duration = datetime.datetime.now() - start
listeners.publish_command_start(
cmd, db, request_id, sock_info.address)
start = datetime.datetime.now()
sock_info.send_message(msg, max_doc_size)
response = sock_info.receive_message(1, request_id)
try:
result = _unpack_response(response, None, codec_options)
except Exception as exc:
if publish:
duration = (datetime.datetime.now() - start) + encoding_duration
if isinstance(exc, (NotMasterError, OperationFailure)):
failure = exc.details
else:
failure = _convert_exception(exc)
listeners.publish_command_failure(
duration, failure, name, request_id, sock_info.address)
raise
if publish:
duration = (datetime.datetime.now() - start) + encoding_duration
listeners.publish_command_success(
duration, result, name, request_id, sock_info.address)
return result
def parse_msg(msg_bytes):
""" Return a command from a binary mongo db message or None if we shoudln't
trace it. The protocol is documented here:
http://docs.mongodb.com/manual/reference/mongodb-wire-protocol
"""
# NOTE[matt] this is used for queries in pymongo <= 3.0.0 and for inserts
# in up to date versions.
msg_len = len(msg_bytes)
if msg_len <= 0:
return None
header = header_struct.unpack_from(msg_bytes, 0)
(length, req_id, response_to, op_code) = header
op = OP_CODES.get(op_code)
if not op:
log.debug("unknown op code: %s", op_code)
return None
db = None
coll = None
offset = header_struct.size
cmd = None
if op == "query":
# NOTE[matt] inserts, updates and queries can all use this opcode
offset += 4 # skip flags
ns = _cstring(msg_bytes[offset:])
offset += len(ns) + 1 # include null terminator
# note: here coll could be '$cmd' because it can be overridden in the
# query itself (like {"insert":"songs"})
db, coll = _split_namespace(ns)
offset += 8 # skip numberToSkip & numberToReturn
if msg_len <= MAX_MSG_PARSE_LEN:
# FIXME[matt] don't try to parse large messages for performance
# reasons. ideally we'd just peek at the first bytes to get
# the critical info (op type, collection, query, # of docs)
# rather than parse the whole thing. i suspect only massive
# inserts will be affected.
codec = CodecOptions(SON)
spec = next(bson.decode_iter(msg_bytes[offset:], codec_options=codec))
cmd = parse_spec(spec, db)
else:
# let's still note that a command happened.
cmd = Command("command", db, "untraced_message_too_large")
# If the command didn't contain namespace info, set it here.
if not cmd.coll:
cmd.coll = coll
cmd.metrics[netx.BYTES_OUT] = msg_len
return cmd
def _unpack_response(response,
cursor_id=None,
codec_options=_UNICODE_REPLACE_CODEC_OPTIONS):
"""Unpack a response from the database.
Check the response for errors and unpack, returning a dictionary
containing the response data.
Can raise CursorNotFound, NotMasterError, ExecutionTimeout, or
OperationFailure.
:Parameters:
- `response`: byte string as returned from the database
- `cursor_id` (optional): cursor_id we sent to get this response -
used for raising an informative exception when we get cursor id not
valid at server response
- `codec_options` (optional): an instance of
:class:`~bson.codec_options.CodecOptions`
"""
response_flag = struct.unpack("<i", response[:4])[0]
if response_flag & 1:
# Shouldn't get this response if we aren't doing a getMore
if cursor_id is None:
raise ProtocolError("No cursor id for getMore operation")
# Fake a getMore command response. OP_GET_MORE provides no document.
msg = "Cursor not found, cursor id: %d" % (cursor_id,)
errobj = {"ok": 0, "errmsg": msg, "code": 43}
raise CursorNotFound(msg, 43, errobj)
elif response_flag & 2:
error_object = bson.BSON(response[20:]).decode()
# Fake the ok field if it doesn't exist.
error_object.setdefault("ok", 0)
if error_object["$err"].startswith("not master"):
raise NotMasterError(error_object["$err"], error_object)
elif error_object.get("code") == 50:
raise ExecutionTimeout(error_object.get("$err"),
error_object.get("code"),
error_object)
raise OperationFailure("database error: %s" %
error_object.get("$err"),
error_object.get("code"),
error_object)
result = {"cursor_id": struct.unpack("<q", response[4:12])[0],
"starting_from": struct.unpack("<i", response[12:16])[0],
"number_returned": struct.unpack("<i", response[16:20])[0],
"data": bson.decode_all(response[20:], codec_options)}
assert len(result["data"]) == result["number_returned"]
return result
def _first_batch(sock_info, db, coll, query, ntoreturn,
slave_ok, codec_options, read_preference, cmd, listeners):
"""Simple query helper for retrieving a first (and possibly only) batch."""
query = _Query(
0, db, coll, 0, query, None,
codec_options, read_preference, ntoreturn, 0, DEFAULT_READ_CONCERN)
name = next(iter(cmd))
duration = None
publish = listeners.enabled_for_commands
if publish:
start = datetime.datetime.now()
request_id, msg, max_doc_size = query.get_message(slave_ok,
sock_info.is_mongos)
if publish:
encoding_duration = datetime.datetime.now() - start
listeners.publish_command_start(
cmd, db, request_id, sock_info.address)
start = datetime.datetime.now()
sock_info.send_message(msg, max_doc_size)
response = sock_info.receive_message(1, request_id)
try:
result = _unpack_response(response, None, codec_options)
except Exception as exc:
if publish:
duration = (datetime.datetime.now() - start) + encoding_duration
if isinstance(exc, (NotMasterError, OperationFailure)):
failure = exc.details
else:
failure = _convert_exception(exc)
listeners.publish_command_failure(
duration, failure, name, request_id, sock_info.address)
raise
if publish:
duration = (datetime.datetime.now() - start) + encoding_duration
listeners.publish_command_success(
duration, result, name, request_id, sock_info.address)
return result
def _unpack_response(response, cursor_id=None, codec_options=CodecOptions()):
"""Unpack a response from the database.
Check the response for errors and unpack, returning a dictionary
containing the response data.
Can raise CursorNotFound, NotMasterError, ExecutionTimeout, or
OperationFailure.
:Parameters:
- `response`: byte string as returned from the database
- `cursor_id` (optional): cursor_id we sent to get this response -
used for raising an informative exception when we get cursor id not
valid at server response
- `codec_options` (optional): an instance of
:class:`~bson.codec_options.CodecOptions`
"""
response_flag = struct.unpack("<i", response[:4])[0]
if response_flag & 1:
# Shouldn't get this response if we aren't doing a getMore
assert cursor_id is not None
# Fake a getMore command response. OP_GET_MORE provides no document.
msg = "Cursor not found, cursor id: %d" % (cursor_id,)
errobj = {"ok": 0, "errmsg": msg, "code": 43}
raise CursorNotFound(msg, 43, errobj)
elif response_flag & 2:
error_object = bson.BSON(response[20:]).decode()
# Fake the ok field if it doesn't exist.
error_object.setdefault("ok", 0)
if error_object["$err"].startswith("not master"):
raise NotMasterError(error_object["$err"], error_object)
elif error_object.get("code") == 50:
raise ExecutionTimeout(error_object.get("$err"),
error_object.get("code"),
error_object)
raise OperationFailure("database error: %s" %
error_object.get("$err"),
error_object.get("code"),
error_object)
result = {"cursor_id": struct.unpack("<q", response[4:12])[0],
"starting_from": struct.unpack("<i", response[12:16])[0],
"number_returned": struct.unpack("<i", response[16:20])[0],
"data": bson.decode_all(response[20:], codec_options)}
assert len(result["data"]) == result["number_returned"]
return result
def _first_batch(sock_info, db, coll, query, ntoreturn,
slave_ok, codec_options, read_preference, cmd, listeners):
"""Simple query helper for retrieving a first (and possibly only) batch."""
query = _Query(
0, db, coll, 0, query, None,
codec_options, read_preference, ntoreturn, 0, DEFAULT_READ_CONCERN)
name = next(iter(cmd))
duration = None
publish = listeners.enabled_for_commands
if publish:
start = datetime.datetime.now()
request_id, msg, max_doc_size = query.get_message(slave_ok,
sock_info.is_mongos)
if publish:
encoding_duration = datetime.datetime.now() - start
listeners.publish_command_start(
cmd, db, request_id, sock_info.address)
start = datetime.datetime.now()
sock_info.send_message(msg, max_doc_size)
response = sock_info.receive_message(1, request_id)
try:
result = _unpack_response(response, None, codec_options)
except Exception as exc:
if publish:
duration = (datetime.datetime.now() - start) + encoding_duration
if isinstance(exc, (NotMasterError, OperationFailure)):
failure = exc.details
else:
failure = _convert_exception(exc)
listeners.publish_command_failure(
duration, failure, name, request_id, sock_info.address)
raise
if publish:
duration = (datetime.datetime.now() - start) + encoding_duration
listeners.publish_command_success(
duration, result, name, request_id, sock_info.address)
return result
def _unpack_response(response,
cursor_id=None,
codec_options=_UNICODE_REPLACE_CODEC_OPTIONS):
"""Unpack a response from the database.
Check the response for errors and unpack, returning a dictionary
containing the response data.
Can raise CursorNotFound, NotMasterError, ExecutionTimeout, or
OperationFailure.
:Parameters:
- `response`: byte string as returned from the database
- `cursor_id` (optional): cursor_id we sent to get this response -
used for raising an informative exception when we get cursor id not
valid at server response
- `codec_options` (optional): an instance of
:class:`~bson.codec_options.CodecOptions`
"""
response_flag = struct.unpack("<i", response[:4])[0]
if response_flag & 1:
# Shouldn't get this response if we aren't doing a getMore
if cursor_id is None:
raise ProtocolError("No cursor id for getMore operation")
# Fake a getMore command response. OP_GET_MORE provides no document.
msg = "Cursor not found, cursor id: %d" % (cursor_id,)
errobj = {"ok": 0, "errmsg": msg, "code": 43}
raise CursorNotFound(msg, 43, errobj)
elif response_flag & 2:
error_object = bson.BSON(response[20:]).decode()
# Fake the ok field if it doesn't exist.
error_object.setdefault("ok", 0)
if error_object["$err"].startswith("not master"):
raise NotMasterError(error_object["$err"], error_object)
elif error_object.get("code") == 50:
raise ExecutionTimeout(error_object.get("$err"),
error_object.get("code"),
error_object)
raise OperationFailure("database error: %s" %
error_object.get("$err"),
error_object.get("code"),
error_object)
result = {"cursor_id": struct.unpack("<q", response[4:12])[0],
"starting_from": struct.unpack("<i", response[12:16])[0],
"number_returned": struct.unpack("<i", response[16:20])[0],
"data": bson.decode_all(response[20:], codec_options)}
assert len(result["data"]) == result["number_returned"]
return result
def _first_batch(sock_info, db, coll, query, ntoreturn,
slave_ok, codec_options, read_preference, cmd, listeners):
"""Simple query helper for retrieving a first (and possibly only) batch."""
query = _Query(
0, db, coll, 0, query, None, codec_options,
read_preference, ntoreturn, 0, DEFAULT_READ_CONCERN, None)
name = next(iter(cmd))
duration = None
publish = listeners.enabled_for_commands
if publish:
start = datetime.datetime.now()
request_id, msg, max_doc_size = query.get_message(slave_ok,
sock_info.is_mongos)
if publish:
encoding_duration = datetime.datetime.now() - start
listeners.publish_command_start(
cmd, db, request_id, sock_info.address)
start = datetime.datetime.now()
sock_info.send_message(msg, max_doc_size)
response = sock_info.receive_message(1, request_id)
try:
result = _unpack_response(response, None, codec_options)
except Exception as exc:
if publish:
duration = (datetime.datetime.now() - start) + encoding_duration
if isinstance(exc, (NotMasterError, OperationFailure)):
failure = exc.details
else:
failure = _convert_exception(exc)
listeners.publish_command_failure(
duration, failure, name, request_id, sock_info.address)
raise
if publish:
duration = (datetime.datetime.now() - start) + encoding_duration
listeners.publish_command_success(
duration, result, name, request_id, sock_info.address)
return result