def batch_size(self, batch_size: int) -> 'Cursor':
"""Limits the number of documents returned in one batch. Each batch
requires a round trip to the server. It can be adjusted to optimize
performance and limit data transfer.
.. note:: batch_size can not override MongoDB's internal limits on the
amount of data it will return to the client in a single batch (i.e
if you set batch size to 1,000,000,000, MongoDB will currently only
return 4-16MB of results per batch).
Raises :exc:`TypeError` if `batch_size` is not an integer.
Raises :exc:`ValueError` if `batch_size` is less than ``0``.
Raises :exc:`~pymongo.errors.InvalidOperation` if this
:class:`Cursor` has already been used. The last `batch_size`
applied to this cursor takes precedence.
:Parameters:
- `batch_size`: The size of each batch of results requested.
"""
if not isinstance(batch_size, int):
raise TypeError('batch_size must be an integer')
if batch_size < 0:
raise ValueError('batch_size must be >= 0')
self.__check_okay_to_chain()
self.__batch_size = batch_size
return self
python类InvalidOperation()的实例源码
def max_scan(self, max_scan: int) -> 'Cursor':
"""Limit the number of documents to scan when performing the query.
Raises :class:`~pymongo.errors.InvalidOperation` if this
cursor has already been used. Only the last :meth:`max_scan`
applied to this cursor has any effect.
:Parameters:
- `max_scan`: the maximum number of documents to scan
"""
self.__check_okay_to_chain()
self.__max_scan = max_scan
return self
def execute(self, write_concern: dict) -> dict:
"""Execute operations.
"""
if not self.ops:
raise InvalidOperation('No operations to execute')
if self.executed:
raise InvalidOperation('Bulk operations can '
'only be executed once.')
self.executed = True
write_concern = (WriteConcern(**write_concern) if
write_concern else self.collection.write_concern)
if self.ordered:
generator = self.gen_ordered()
else:
generator = self.gen_unordered()
connection = await self.collection.database.client.get_connection()
if connection.max_wire_version < 5 and self.uses_collation:
raise ConfigurationError(
'Must be connected to MongoDB 3.4+ to use a collation.')
if not write_concern.acknowledged:
if self.uses_collation:
raise ConfigurationError(
'Collation is unsupported for unacknowledged writes.')
await self.execute_no_results(connection, generator)
else:
return await self.execute_command(connection, generator, write_concern)
def add_members_to_group(self, members_username, group_name):
found_group = None
if type(members_username) != list:
members_username = [members_username]
for group in self.groups:
if group['name'] == group_name:
found_group = group
break
if found_group:
new_members = [
member for member in members_username
if member not in found_group['users']
]
for member in new_members:
self.bulk.find({'$and': [
{'username': self.username},
{'groups.name': group_name}
]}).update({'$push': {'groups.$.users': member}})
try:
self.bulk.execute()
except InvalidOperation as e:
print(e)
else:
print('Group {} does not exist!'.format(group_name))
def _raise_if_unacknowledged(self, property_name):
"""Raise an exception on property access if unacknowledged."""
if not self.__acknowledged:
raise InvalidOperation("A value for %s is not available when "
"the write is unacknowledged. Check the "
"acknowledged attribute to avoid this "
"error." % (property_name,))
def __init__(self, bulk_api_result, acknowledged):
"""Create a BulkWriteResult instance.
:Parameters:
- `bulk_api_result`: A result dict from the bulk API
- `acknowledged`: Was this write result acknowledged? If ``False``
then all properties of this object will raise
:exc:`~pymongo.errors.InvalidOperation`.
"""
self.__bulk_api_result = bulk_api_result
super(BulkWriteResult, self).__init__(acknowledged)
def insert(collection_name, docs, check_keys,
safe, last_error_args, continue_on_error, uuid_subtype):
"""Get an **insert** message.
.. note:: As of PyMongo 2.6, this function is no longer used. It
is being kept (with tests) for backwards compatibility with 3rd
party libraries that may currently be using it, but will likely
be removed in a future release.
"""
options = 0
if continue_on_error:
options += 1
data = struct.pack("<i", options)
data += bson._make_c_string(collection_name)
encoded = [bson.BSON.encode(doc, check_keys, uuid_subtype) for doc in docs]
if not encoded:
raise InvalidOperation("cannot do an empty bulk insert")
max_bson_size = max(map(len, encoded))
data += _EMPTY.join(encoded)
if safe:
(_, insert_message) = __pack_message(2002, data)
(request_id, error_message, _) = __last_error(collection_name,
last_error_args)
return (request_id, insert_message + error_message, max_bson_size)
else:
(request_id, insert_message) = __pack_message(2002, data)
return (request_id, insert_message, max_bson_size)
def _raise_if_unacknowledged(self, property_name):
"""Raise an exception on property access if unacknowledged."""
if not self.__acknowledged:
raise InvalidOperation("A value for %s is not available when "
"the write is unacknowledged. Check the "
"acknowledged attribute to avoid this "
"error." % (property_name,))
def __init__(self, bulk_api_result, acknowledged):
"""Create a BulkWriteResult instance.
:Parameters:
- `bulk_api_result`: A result dict from the bulk API
- `acknowledged`: Was this write result acknowledged? If ``False``
then all properties of this object will raise
:exc:`~pymongo.errors.InvalidOperation`.
"""
self.__bulk_api_result = bulk_api_result
super(BulkWriteResult, self).__init__(acknowledged)
def _raise_if_unacknowledged(self, property_name):
"""Raise an exception on property access if unacknowledged."""
if not self.__acknowledged:
raise InvalidOperation("A value for %s is not available when "
"the write is unacknowledged. Check the "
"acknowledged attribute to avoid this "
"error." % (property_name,))
def __init__(self, bulk_api_result, acknowledged):
"""Create a BulkWriteResult instance.
:Parameters:
- `bulk_api_result`: A result dict from the bulk API
- `acknowledged`: Was this write result acknowledged? If ``False``
then all properties of this object will raise
:exc:`~pymongo.errors.InvalidOperation`.
"""
self.__bulk_api_result = bulk_api_result
super(BulkWriteResult, self).__init__(acknowledged)
def _raise_if_unacknowledged(self, property_name):
"""Raise an exception on property access if unacknowledged."""
if not self.__acknowledged:
raise InvalidOperation("A value for %s is not available when "
"the write is unacknowledged. Check the "
"acknowledged attribute to avoid this "
"error." % (property_name,))
def __init__(self, bulk_api_result, acknowledged):
"""Create a BulkWriteResult instance.
:Parameters:
- `bulk_api_result`: A result dict from the bulk API
- `acknowledged`: Was this write result acknowledged? If ``False``
then all properties of this object will raise
:exc:`~pymongo.errors.InvalidOperation`.
"""
self.__bulk_api_result = bulk_api_result
super(BulkWriteResult, self).__init__(acknowledged)
def _raise_if_unacknowledged(self, property_name):
"""Raise an exception on property access if unacknowledged."""
if not self.__acknowledged:
raise InvalidOperation("A value for %s is not available when "
"the write is unacknowledged. Check the "
"acknowledged attribute to avoid this "
"error." % (property_name,))
def __init__(self, bulk_api_result, acknowledged):
"""Create a BulkWriteResult instance.
:Parameters:
- `bulk_api_result`: A result dict from the bulk API
- `acknowledged`: Was this write result acknowledged? If ``False``
then all properties of this object will raise
:exc:`~pymongo.errors.InvalidOperation`.
"""
self.__bulk_api_result = bulk_api_result
super(BulkWriteResult, self).__init__(acknowledged)
def __init__(self, bulk_api_result, acknowledged):
"""Create a BulkWriteResult instance.
:Parameters:
- `bulk_api_result`: A result dict from the bulk API
- `acknowledged`: Was this write result acknowledged? If ``False``
then all properties of this object will raise
:exc:`~pymongo.errors.InvalidOperation`.
"""
self.__bulk_api_result = bulk_api_result
super(BulkWriteResult, self).__init__(acknowledged)
def test_limit(self, test_db):
with pytest.raises(TypeError):
test_db.test.find().limit()
with pytest.raises(TypeError):
test_db.test.find().limit('hello')
with pytest.raises(TypeError):
test_db.test.find().limit(5.5)
assert test_db.test.find().limit(5)
await test_db.test.drop()
await test_db.test.insert_many([{'x': i} for i in range(100)])
count = 0
async for _ in test_db.test.find():
count += 1
assert count == 100
count = 0
async for _ in test_db.test.find().limit(20):
count += 1
assert count == 20
count = 0
async for _ in test_db.test.find().limit(99):
count += 1
assert count == 99
count = 0
async for _ in test_db.test.find().limit(1):
count += 1
assert count == 1
count = 0
async for _ in test_db.test.find().limit(0):
count += 1
assert count == 100
count = 0
async for _ in test_db.test.find().limit(0).limit(50).limit(10):
count += 1
assert count == 10
a = test_db.test.find()
a.limit(10)
async for _ in a:
break
with pytest.raises(InvalidOperation):
a.limit(5)
def test_batch_size(self, test_db, mongo_version):
await test_db.test.insert_many([{'x': x} for x in range(200)])
with pytest.raises(TypeError):
test_db.test.find().batch_size(None)
with pytest.raises(TypeError):
test_db.test.find().batch_size('hello')
with pytest.raises(TypeError):
test_db.test.find().batch_size(5.5)
with pytest.raises(ValueError):
test_db.test.find().batch_size(-1)
assert test_db.test.find().batch_size(5)
a = test_db.test.find()
async for _ in a:
break
with pytest.raises(InvalidOperation):
a.batch_size(5)
async def cursor_count(cursor, expected_count):
count = 0
async with cursor:
async for _ in cursor:
count += 1
assert expected_count, count
await cursor_count(test_db.test.find().batch_size(0), 200)
await cursor_count(test_db.test.find().batch_size(1), 200)
await cursor_count(test_db.test.find().batch_size(2), 200)
await cursor_count(test_db.test.find().batch_size(5), 200)
await cursor_count(test_db.test.find().batch_size(100), 200)
await cursor_count(test_db.test.find().batch_size(500), 200)
await cursor_count(test_db.test.find().batch_size(0).limit(1), 1)
await cursor_count(test_db.test.find().batch_size(1).limit(1), 1)
await cursor_count(test_db.test.find().batch_size(2).limit(1), 1)
await cursor_count(test_db.test.find().batch_size(5).limit(1), 1)
await cursor_count(test_db.test.find().batch_size(100).limit(1), 1)
await cursor_count(test_db.test.find().batch_size(500).limit(1), 1)
await cursor_count(test_db.test.find().batch_size(0).limit(10), 10)
await cursor_count(test_db.test.find().batch_size(1).limit(10), 10)
await cursor_count(test_db.test.find().batch_size(2).limit(10), 10)
await cursor_count(test_db.test.find().batch_size(5).limit(10), 10)
await cursor_count(test_db.test.find().batch_size(100).limit(10), 10)
await cursor_count(test_db.test.find().batch_size(500).limit(10), 10)
cur = test_db.test.find().batch_size(1)
await self._next(cur)
if mongo_version.at_least(3, 1, 9):
# find command batchSize should be 1
assert 0 == len(cur._Cursor__data)
else:
# OP_QUERY ntoreturn should be 2
assert 1 == len(cur._Cursor__data)
await self._next(cur)
assert 0 == len(cur._Cursor__data)
await self._next(cur)
assert 0 == len(cur._Cursor__data)
await self._next(cur)
assert 0 == len(cur._Cursor__data)
def test_sort(self, test_db):
with pytest.raises(TypeError):
test_db.test.find().sort(5)
with pytest.raises(ValueError):
test_db.test.find().sort([])
with pytest.raises(TypeError):
test_db.test.find().sort([], ASCENDING)
with pytest.raises(TypeError):
test_db.test.find().sort([], [('hello', DESCENDING)], DESCENDING)
unsort = list(range(10))
random.shuffle(unsort)
await test_db.test.insert_many([{'x': i} for i in unsort])
asc = [i['x'] for i in await test_db.test.find().sort('x', ASCENDING).to_list()]
assert asc == list(range(10))
asc = [i['x'] for i in await test_db.test.find().sort('x').to_list()]
assert asc == list(range(10))
asc = [i['x'] for i in await test_db.test.find().sort([('x', ASCENDING)]).to_list()]
assert asc == list(range(10))
expect = list(reversed(range(10)))
desc = [i['x'] for i in await test_db.test.find().sort('x', DESCENDING).to_list()]
assert desc == expect
desc = [i['x'] for i in await test_db.test.find().sort([('x', DESCENDING)]).to_list()]
assert desc == expect
desc = [i['x'] for i in
await test_db.test.find().sort('x', ASCENDING).sort('x', DESCENDING).to_list()]
assert desc == expect
expected = [(1, 5), (2, 5), (0, 3), (7, 3), (9, 2), (2, 1), (3, 1)]
shuffled = list(expected)
random.shuffle(shuffled)
await test_db.test.drop()
for (a, b) in shuffled:
await test_db.test.insert_one({'a': a, 'b': b})
result = [(i['a'], i['b']) for i in
await test_db.test.find().sort([('b', DESCENDING),
('a', ASCENDING)]).to_list()]
assert result == expected
a = test_db.test.find()
a.sort('x', ASCENDING)
async for _ in a:
break
with pytest.raises(InvalidOperation):
a.sort('x', ASCENDING)
def test_comment(self, mongo, test_db, mongo_version):
connection = await mongo.get_connection()
if connection.is_mongos:
pytest.skip('Not supported via mongos')
return
# MongoDB 3.1.5 changed the ns for commands.
regex = {'$regex': '{}.(\$cmd|test)'.format(test_db.name)}
if mongo_version.at_least(3, 1, 8, -1):
query_key = 'query.comment'
else:
query_key = 'query.$comment'
await test_db.set_profiling_level(ALL)
try:
list(await test_db.test.find().comment('foo').to_list())
op = test_db.system.profile.find({
'ns': '{}.test'.format(test_db.name),
'op': 'query',
query_key: 'foo'
})
assert await op.count() == 1
await test_db.test.find().comment('foo').count()
op = test_db.system.profile.find({
'ns': regex,
'op': 'command',
'command.count': 'test',
'command.$comment': 'foo'
})
assert await op.count() == 1
await test_db.test.find().comment('foo').distinct('type')
op = test_db.system.profile.find({
'ns': regex,
'op': 'command',
'command.distinct': 'test',
'command.$comment': 'foo'
})
assert await op.count() == 1
finally:
await test_db.set_profiling_level(OFF)
await test_db.system.profile.drop()
await test_db.test.insert_many([{}, {}])
cursor = test_db.test.find()
await self._next(cursor)
with pytest.raises(InvalidOperation):
cursor.comment('hello')