def test_prepared_query_not_found(self):
session = self.make_session()
pool = session._pools.get.return_value
connection = Mock(spec=Connection)
pool.borrow_connection.return_value = (connection, 1)
rf = self.make_response_future(session)
rf.send_request()
session.cluster._prepared_statements = MagicMock(dict)
prepared_statement = session.cluster._prepared_statements.__getitem__.return_value
prepared_statement.query_string = "SELECT * FROM foobar"
prepared_statement.keyspace = "FooKeyspace"
rf._connection.keyspace = "FooKeyspace"
result = Mock(spec=PreparedQueryNotFound, info='a' * 16)
rf._set_result(None, None, None, result)
self.assertTrue(session.submit.call_args)
args, kwargs = session.submit.call_args
self.assertEqual(rf._reprepare, args[-5])
self.assertIsInstance(args[-4], PrepareMessage)
self.assertEqual(args[-4].query, "SELECT * FROM foobar")
test_response_future.py 文件源码
python
阅读 21
收藏 0
点赞 0
评论 0
评论列表
文章目录