def test_write_pty(self):
master, slave = os.openpty()
slave_write_obj = io.open(slave, 'wb', 0)
proto = MyWritePipeProto(loop=self.loop)
connect = self.loop.connect_write_pipe(lambda: proto, slave_write_obj)
transport, p = self.loop.run_until_complete(connect)
self.assertIs(p, proto)
self.assertIs(transport, proto.transport)
self.assertEqual('CONNECTED', proto.state)
transport.write(b'1')
data = bytearray()
def reader(data):
chunk = os.read(master, 1024)
data += chunk
return len(data)
test_utils.run_until(self.loop, lambda: reader(data) >= 1,
timeout=10)
self.assertEqual(b'1', data)
transport.write(b'2345')
test_utils.run_until(self.loop, lambda: reader(data) >= 5,
timeout=10)
self.assertEqual(b'12345', data)
self.assertEqual('CONNECTED', proto.state)
os.close(master)
# extra info is available
self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
# close connection
proto.transport.close()
self.loop.run_until_complete(proto.done)
self.assertEqual('CLOSED', proto.state)
评论列表
文章目录