test_kqueue.py 文件源码

python
阅读 22 收藏 0 点赞 0 评论 0

项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码
def testPair(self):
        kq = select.kqueue()
        a, b = socket.socketpair()

        a.send(b'foo')
        event1 = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
        event2 = select.kevent(b, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
        r = kq.control([event1, event2], 1, 1)
        self.assertTrue(r)
        self.assertFalse(r[0].flags & select.KQ_EV_ERROR)
        self.assertEqual(b.recv(r[0].data), b'foo')

        a.close()
        b.close()
        kq.close()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号