def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
"""Check that a partial write, when it gets interrupted, properly
invokes the signal handler, and bubbles up the exception raised
in the latter."""
read_results = []
def _read():
if hasattr(signal, 'pthread_sigmask'):
signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
s = os.read(r, 1)
read_results.append(s)
t = threading.Thread(target=_read)
t.daemon = True
r, w = os.pipe()
fdopen_kwargs["closefd"] = False
try:
wio = self.io.open(w, **fdopen_kwargs)
t.start()
# Fill the pipe enough that the write will be blocking.
# It will be interrupted by the timer armed above. Since the
# other thread has read one byte, the low-level write will
# return with a successful (partial) result rather than an EINTR.
# The buffered IO layer must check for pending signal
# handlers, which in this case will invoke alarm_interrupt().
signal.alarm(1)
try:
self.assertRaises(ZeroDivisionError,
wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
finally:
signal.alarm(0)
t.join()
# We got one byte, get another one and check that it isn't a
# repeat of the first one.
read_results.append(os.read(r, 1))
self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
finally:
os.close(w)
os.close(r)
# This is deliberate. If we didn't close the file descriptor
# before closing wio, wio would try to flush its internal
# buffer, and block again.
try:
wio.close()
except OSError as e:
if e.errno != errno.EBADF:
raise
评论列表
文章目录