def test_scp(event_loop):
async with base.CleanModel() as model:
await model.add_machine()
await asyncio.wait_for(
model.block_until(lambda: model.machines),
timeout=240)
machine = model.machines['0']
await asyncio.wait_for(
model.block_until(lambda: (machine.status == 'running' and
machine.agent_status == 'started')),
timeout=480)
with NamedTemporaryFile() as f:
f.write(b'testcontents')
f.flush()
await machine.scp_to(f.name, 'testfile')
with NamedTemporaryFile() as f:
await machine.scp_from('testfile', f.name)
assert f.read() == b'testcontents'
评论列表
文章目录