def test_destroy(self):
"""Context.destroy should close sockets"""
ctx = self.Context()
sockets = [ ctx.socket(zmq.REP) for i in range(65) ]
# close half of the sockets
[ s.close() for s in sockets[::2] ]
ctx.destroy()
# reaper is not instantaneous
time.sleep(1e-2)
for s in sockets:
self.assertTrue(s.closed)
评论列表
文章目录