def test_6_cleanup(self):
"""
verify that when an SSHClient is collected, its transport (and the
transport's packetizer) is closed.
"""
# Unclear why this is borked on Py3, but it is, and does not seem worth
# pursuing at the moment.
# XXX: It's the release of the references to e.g packetizer that fails
# in py3...
if not PY2:
return
threading.Thread(target=self._run).start()
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
self.tc = paramiko.SSHClient()
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.assertEqual(0, len(self.tc.get_host_keys()))
self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')
self.event.wait(1.0)
self.assertTrue(self.event.is_set())
self.assertTrue(self.ts.is_active())
p = weakref.ref(self.tc._transport.packetizer)
self.assertTrue(p() is not None)
self.tc.close()
del self.tc
# hrm, sometimes p isn't cleared right away. why is that?
#st = time.time()
#while (time.time() - st < 5.0) and (p() is not None):
# time.sleep(0.1)
# instead of dumbly waiting for the GC to collect, force a collection
# to see whether the SSHClient object is deallocated correctly
import gc
gc.collect()
self.assertTrue(p() is None)
评论列表
文章目录