def test_notify_deque(self):
from collections import deque
self.autocommit(self.conn)
self.conn.notifies = deque()
self.listen('foo')
self.notify('foo').communicate()
time.sleep(0.5)
self.conn.poll()
notify = self.conn.notifies.popleft()
self.assertTrue(isinstance(notify, psycopg2.extensions.Notify))
self.assertEqual(len(self.conn.notifies), 0)
评论列表
文章目录