def tearDown(self):
contexts = set([self.context])
while self.sockets:
sock = self.sockets.pop()
contexts.add(sock.context) # in case additional contexts are created
sock.close()
try:
gevent.joinall([gevent.spawn(ctx.term) for ctx in contexts], timeout=2, raise_error=True)
except gevent.Timeout:
raise RuntimeError("context could not terminate, open sockets likely remain in test")
评论列表
文章目录