test_response_future.py 文件源码

python
阅读 21 收藏 0 点赞 0 评论 0

项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码
def test_prepared_query_not_found(self):
        session = self.make_session()
        pool = session._pools.get.return_value
        connection = Mock(spec=Connection)
        pool.borrow_connection.return_value = (connection, 1)

        rf = self.make_response_future(session)
        rf.send_request()

        session.cluster._prepared_statements = MagicMock(dict)
        prepared_statement = session.cluster._prepared_statements.__getitem__.return_value
        prepared_statement.query_string = "SELECT * FROM foobar"
        prepared_statement.keyspace = "FooKeyspace"
        rf._connection.keyspace = "FooKeyspace"

        result = Mock(spec=PreparedQueryNotFound, info='a' * 16)
        rf._set_result(None, None, None, result)

        self.assertTrue(session.submit.call_args)
        args, kwargs = session.submit.call_args
        self.assertEqual(rf._reprepare, args[-5])
        self.assertIsInstance(args[-4], PrepareMessage)
        self.assertEqual(args[-4].query, "SELECT * FROM foobar")
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号