def test_poll(self, cursor):
@gen.engine
def f():
yield cursor.poll()
self.stop()
self.assertRaises(presto.ProgrammingError, self.run_gen, f)
yield cursor.execute('SELECT * FROM one_row')
while True:
status = yield cursor.poll()
if status is None:
break
self.assertIn('stats', status)
def fail(*args, **kwargs):
self.fail("Should not need requests.get after done polling") # pragma: no cover
with mock.patch.object(AsyncHTTPClient, 'fetch') as fetch:
fetch.side_effect = fail
self.assertEqual((yield cursor.fetchall()), [[1]])
评论列表
文章目录