test_io.py 文件源码

python
阅读 26 收藏 0 点赞 0 评论 0

项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码
def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
        """Check that a partial write, when it gets interrupted, properly
        invokes the signal handler, and bubbles up the exception raised
        in the latter."""
        read_results = []
        def _read():
            if hasattr(signal, 'pthread_sigmask'):
                signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
            s = os.read(r, 1)
            read_results.append(s)
        t = threading.Thread(target=_read)
        t.daemon = True
        r, w = os.pipe()
        fdopen_kwargs["closefd"] = False
        try:
            wio = self.io.open(w, **fdopen_kwargs)
            t.start()
            # Fill the pipe enough that the write will be blocking.
            # It will be interrupted by the timer armed above.  Since the
            # other thread has read one byte, the low-level write will
            # return with a successful (partial) result rather than an EINTR.
            # The buffered IO layer must check for pending signal
            # handlers, which in this case will invoke alarm_interrupt().
            signal.alarm(1)
            try:
                self.assertRaises(ZeroDivisionError,
                            wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
            finally:
                signal.alarm(0)
            t.join()
            # We got one byte, get another one and check that it isn't a
            # repeat of the first one.
            read_results.append(os.read(r, 1))
            self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
        finally:
            os.close(w)
            os.close(r)
            # This is deliberate. If we didn't close the file descriptor
            # before closing wio, wio would try to flush its internal
            # buffer, and block again.
            try:
                wio.close()
            except OSError as e:
                if e.errno != errno.EBADF:
                    raise
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号