def test_020_get_events_reader_local(self):
self.app.qubesd_connection_type = 'socket'
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
sock1, sock2 = socket.socketpair()
with unittest.mock.patch('asyncio.open_unix_connection',
lambda path: self.mock_open_unix_connection(
qubesadmin.config.QUBESD_SOCKET, sock1, path)):
task = asyncio.ensure_future(self.dispatcher._get_events_reader())
reader = asyncio.ensure_future(loop.run_in_executor(None,
self.read_all, sock2))
loop.run_until_complete(asyncio.wait([task, reader]))
self.assertEqual(reader.result(),
b'dom0\0admin.Events\0dom0\0\0')
self.assertIsInstance(task.result()[0], asyncio.StreamReader)
cleanup_func = task.result()[1]
cleanup_func()
sock2.close()
# run socket cleanup functions
loop.stop()
loop.run_forever()
loop.close()
评论列表
文章目录