def test_query_retry(self):
pool = self.pool
sql = (
'set session wait_timeout=1;'
)
pool.query(sql)
pool.query('show variables like "%timeout%";')
with pool() as conn:
time.sleep(2)
with self.assertRaises(MySQLdb.OperationalError):
print conn.query('show databases')
# no error raise from above, thus a timed out conn has been left in
# pool
stat = pool('stat')
dd('stat after timeout', stat)
self.assertEqual(1, stat['create'], 'created 1 conn')
# use previous conn, timed out and retry.
pool.query('show databases', retry=1)
stat = pool('stat')
dd('stat after retry', stat)
self.assertEqual(2, stat['create'], 'created another conn for retry')
评论列表
文章目录