def crappy_callback(self, conn):
"""green callback failing after `self.to_error` time it is called"""
import select
from psycopg2.extensions import POLL_OK, POLL_READ, POLL_WRITE
while 1:
if self.to_error is not None:
self.to_error -= 1
if self.to_error <= 0:
raise ZeroDivisionError("I accidentally the connection")
try:
state = conn.poll()
if state == POLL_OK:
break
elif state == POLL_READ:
select.select([conn.fileno()], [], [])
elif state == POLL_WRITE:
select.select([], [conn.fileno()], [])
else:
raise conn.OperationalError("bad state from poll: %s" % state)
except KeyboardInterrupt:
conn.cancel()
# the loop will be broken by a server error
continue
评论列表
文章目录