test_cursor.py 文件源码

python
阅读 22 收藏 0 点赞 0 评论 0

项目:aiomongo 作者: ZeoAlliance 项目源码 文件源码
def test_batch_size(self, test_db, mongo_version):
        await test_db.test.insert_many([{'x': x} for x in range(200)])

        with pytest.raises(TypeError):
            test_db.test.find().batch_size(None)
        with pytest.raises(TypeError):
            test_db.test.find().batch_size('hello')
        with pytest.raises(TypeError):
            test_db.test.find().batch_size(5.5)
        with pytest.raises(ValueError):
            test_db.test.find().batch_size(-1)
        assert test_db.test.find().batch_size(5)
        a = test_db.test.find()
        async for _ in a:
            break

        with pytest.raises(InvalidOperation):
            a.batch_size(5)

        async def cursor_count(cursor, expected_count):
            count = 0
            async with cursor:
                async for _ in cursor:
                    count += 1
            assert expected_count, count

        await cursor_count(test_db.test.find().batch_size(0), 200)
        await cursor_count(test_db.test.find().batch_size(1), 200)
        await cursor_count(test_db.test.find().batch_size(2), 200)
        await cursor_count(test_db.test.find().batch_size(5), 200)
        await cursor_count(test_db.test.find().batch_size(100), 200)
        await cursor_count(test_db.test.find().batch_size(500), 200)

        await cursor_count(test_db.test.find().batch_size(0).limit(1), 1)
        await cursor_count(test_db.test.find().batch_size(1).limit(1), 1)
        await cursor_count(test_db.test.find().batch_size(2).limit(1), 1)
        await cursor_count(test_db.test.find().batch_size(5).limit(1), 1)
        await cursor_count(test_db.test.find().batch_size(100).limit(1), 1)
        await cursor_count(test_db.test.find().batch_size(500).limit(1), 1)

        await cursor_count(test_db.test.find().batch_size(0).limit(10), 10)
        await cursor_count(test_db.test.find().batch_size(1).limit(10), 10)
        await cursor_count(test_db.test.find().batch_size(2).limit(10), 10)
        await cursor_count(test_db.test.find().batch_size(5).limit(10), 10)
        await cursor_count(test_db.test.find().batch_size(100).limit(10), 10)
        await cursor_count(test_db.test.find().batch_size(500).limit(10), 10)

        cur = test_db.test.find().batch_size(1)
        await self._next(cur)
        if mongo_version.at_least(3, 1, 9):
            # find command batchSize should be 1
            assert 0 == len(cur._Cursor__data)
        else:
            # OP_QUERY ntoreturn should be 2
            assert 1 == len(cur._Cursor__data)
        await self._next(cur)
        assert 0 == len(cur._Cursor__data)
        await self._next(cur)
        assert 0 == len(cur._Cursor__data)
        await self._next(cur)
        assert 0 == len(cur._Cursor__data)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号