def test_process_does_not_block(self):
mock_socket = Mock()
mock_socket.recv_unicode.side_effect = zmq.Again()
server = ControlServer(None, connection_string='127.0.0.1:10000')
server._socket = mock_socket
assertRaisesNothing(self, server.process)
mock_socket.recv_unicode.assert_has_calls([call(flags=zmq.NOBLOCK)])
评论列表
文章目录