def _test_bug_551(self, query):
script = ("""\
import os
import sys
import time
import signal
import threading
import psycopg2
def handle_sigabort(sig, frame):
sys.exit(1)
def killer():
time.sleep(0.5)
os.kill(os.getpid(), signal.SIGABRT)
signal.signal(signal.SIGABRT, handle_sigabort)
conn = psycopg2.connect(%(dsn)r)
cur = conn.cursor()
cur.execute("create table test551 (id serial, num varchar(50))")
t = threading.Thread(target=killer)
t.daemon = True
t.start()
while True:
cur.execute(%(query)r, ("Hello, world!",))
""" % {'dsn': dsn, 'query': query})
proc = sp.Popen([sys.executable, '-c', script_to_py3(script)],
stdout=sp.PIPE, stderr=sp.PIPE)
(out, err) = proc.communicate()
self.assertNotEqual(proc.returncode, 0)
# Strip [NNN refs] from output
err = re.sub(br'\[[^\]]+\]', b'', err).strip()
self.assertTrue(not err, err)
评论列表
文章目录