def test_cpu_usage_after_pub_send_or_dealer_recv(self):
"""zmq eats CPU after PUB send or DEALER recv.
Same https://bitbucket.org/eventlet/eventlet/issue/128
"""
pub, sub, _port = self.create_bound_pair(zmq.PUB, zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, b"")
eventlet.sleep()
pub.send(b'test_send')
tests.check_idle_cpu_usage(0.2, 0.1)
sender, receiver, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER)
eventlet.sleep()
sender.send(b'test_recv')
msg = receiver.recv()
self.assertEqual(msg, b'test_recv')
tests.check_idle_cpu_usage(0.2, 0.1)
评论列表
文章目录