def test_concurrent_execution(self):
def slave():
cnn = self.connect()
cur = cnn.cursor()
cur.execute("select pg_sleep(4)")
cur.close()
cnn.close()
t1 = threading.Thread(target=slave)
t2 = threading.Thread(target=slave)
t0 = time.time()
t1.start()
t2.start()
t1.join()
t2.join()
self.assertTrue(time.time() - t0 < 7,
"something broken in concurrency")
test_connection.py 文件源码
python
阅读 30
收藏 0
点赞 0
评论 0
评论列表
文章目录